candiduslynx commented on code in PR #35769:
URL: https://github.com/apache/arrow/pull/35769#discussion_r1238213844
##########
go/arrow/internal/testing/gen/random_array_gen.go:
##########
@@ -350,6 +350,40 @@ func (r *RandomArrayGenerator) LargeString(size int64,
minLength, maxLength int6
return bldr.NewArray()
}
+func (r *RandomArrayGenerator) StringView(size int64, minLength, maxLength
int64, nullProb float64) arrow.Array {
+ return r.generateBinaryView(arrow.BinaryTypes.StringView, size,
minLength, maxLength, nullProb)
+}
+
+func (r *RandomArrayGenerator) generateBinaryView(dt arrow.DataType, size
int64, minLength, maxLength int64, nullProb float64) arrow.Array {
+ lengths := r.Int32(size, int32(minLength), int32(maxLength),
nullProb).(*array.Int32)
+ defer lengths.Release()
+
+ bldr := array.NewBuilder(r.mem, dt).(array.StringLikeBuilder)
+ defer bldr.Release()
+
+ r.extra++
+ dist := rand.New(rand.NewSource(r.seed + r.extra))
+
+ buf := make([]byte, 0, maxLength)
+ gen := func(n int32) string {
+ out := buf[:n]
+ for i := range out {
+ out[i] = uint8(dist.Int31n(int32('z')-int32('A')+1) +
int32('A'))
+ }
+ return string(out)
+ }
+
+ for i := 0; i < lengths.Len(); i++ {
+ if lengths.IsValid(i) {
+ bldr.Append(gen(lengths.Value(i)))
+ } else {
+ bldr.AppendNull()
+ }
Review Comment:
```suggestion
if lengths.IsNull(i) {
bldr.AppendNull()
continue
}
bldr.Append(gen(lengths.Value(i)))
```
exit-early can be utilized here as well, nit
##########
go/arrow/array/binarybuilder.go:
##########
@@ -370,6 +371,327 @@ func (b *BinaryBuilder) UnmarshalJSON(data []byte) error {
return b.Unmarshal(dec)
}
+const (
+ dfltBlockSize = 1 << 20 // 1 MB
+ viewValueSizeLimit uint32 = math.MaxUint32
+)
+
+type BinaryViewBuilder struct {
+ builder
+ dtype arrow.BinaryDataType
+
+ data *memory.Buffer
+ rawData []arrow.StringHeader
+
+ blockBuilder multiBufferBuilder
+}
+
+func NewBinaryViewBuilder(mem memory.Allocator) *BinaryViewBuilder {
+ return &BinaryViewBuilder{
+ dtype: arrow.BinaryTypes.BinaryView,
+ builder: builder{
+ refCount: 1,
+ mem: mem,
+ },
+ blockBuilder: multiBufferBuilder{
+ refCount: 1,
+ blockSize: dfltBlockSize,
+ mem: mem,
+ },
+ }
+}
+
+func (b *BinaryViewBuilder) Type() arrow.DataType { return b.dtype }
+
+func (b *BinaryViewBuilder) Release() {
+ debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+
+ if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.nullBitmap != nil {
+ b.nullBitmap.Release()
+ b.nullBitmap = nil
+ }
+ if b.data != nil {
+ b.data.Release()
+ b.data = nil
+ b.rawData = nil
+ }
+ }
+}
+
+func (b *BinaryViewBuilder) init(capacity int) {
+ b.builder.init(capacity)
+ b.data = memory.NewResizableBuffer(b.mem)
+ bytesN := arrow.StringHeaderTraits.BytesRequired(capacity)
+ b.data.Resize(bytesN)
+ b.rawData = arrow.StringHeaderTraits.CastFromBytes(b.data.Bytes())
+}
+
+func (b *BinaryViewBuilder) Resize(n int) {
+ nbuild := n
+ if n < minBuilderCapacity {
+ n = minBuilderCapacity
+ }
+
+ if b.capacity == 0 {
+ b.init(n)
+ } else {
+ b.builder.resize(nbuild, b.init)
+ b.data.Resize(arrow.StringHeaderTraits.BytesRequired(n))
+ b.rawData =
arrow.StringHeaderTraits.CastFromBytes(b.data.Bytes())
+ }
+}
+
+func (b *BinaryViewBuilder) ReserveData(length int) {
+ if uint32(length) > viewValueSizeLimit {
+ panic(fmt.Errorf("%w: BinaryView or StringView elements cannot
reference strings larger than 4GB",
+ arrow.ErrInvalid))
+ }
+ b.blockBuilder.Reserve(int(length))
+}
+
+func (b *BinaryViewBuilder) Reserve(n int) {
+ b.builder.reserve(n, b.Resize)
+}
+
+func (b *BinaryViewBuilder) Append(v []byte) {
+ if uint32(len(v)) > viewValueSizeLimit {
+ panic(fmt.Errorf("%w: BinaryView or StringView elements cannot
reference strings larger than 4GB", arrow.ErrInvalid))
+ }
+
+ if !arrow.IsStringHeaderInline(len(v)) {
+ b.ReserveData(len(v))
+ }
+
+ b.Reserve(1)
+ b.UnsafeAppend(v)
+}
+
+// AppendString is identical to Append, only accepting a string instead
+// of a byte slice, avoiding the extra copy that would occur if you simply
+// did []byte(v).
+//
+// This is different than AppendValueFromString which exists for the
+// Builder interface, in that this expects raw binary data which is
+// appended as such. AppendValueFromString expects base64 encoded binary
+// data instead.
+func (b *BinaryViewBuilder) AppendString(v string) {
+ // create a []byte without copying the bytes
+ // in go1.20 this would be unsafe.StringData
+ val := *(*[]byte)(unsafe.Pointer(&struct {
+ string
+ int
+ }{v, len(v)}))
+ b.Append(val)
+}
+
+func (b *BinaryViewBuilder) AppendNull() {
+ b.Reserve(1)
+ b.UnsafeAppendBoolToBitmap(false)
+}
+
+func (b *BinaryViewBuilder) AppendNulls(n int) {
+ b.Reserve(n)
+ for i := 0; i < n; i++ {
+ b.UnsafeAppendBoolToBitmap(false)
+ }
+}
+
+func (b *BinaryViewBuilder) AppendEmptyValue() {
+ b.Reserve(1)
+ b.UnsafeAppendBoolToBitmap(true)
+}
+
+func (b *BinaryViewBuilder) AppendEmptyValues(n int) {
+ b.Reserve(n)
+ b.unsafeAppendBoolsToBitmap(nil, n)
+}
+
+func (b *BinaryViewBuilder) UnsafeAppend(v []byte) {
+ hdr := &b.rawData[b.length]
+ hdr.SetBytes(v)
+ if !hdr.IsInline() {
+ b.blockBuilder.UnsafeAppend(hdr, v)
+ }
+ b.UnsafeAppendBoolToBitmap(true)
+}
+
+func (b *BinaryViewBuilder) AppendValues(v [][]byte, valid []bool) {
+ if len(v) != len(valid) && len(valid) != 0 {
+ panic("len(v) != len(valid) && len(valid) != 0")
+ }
+
+ if len(v) == 0 {
+ return
+ }
+
+ b.Reserve(len(v))
+ outOfLineTotal := 0
+ for i, vv := range v {
+ if len(valid) == 0 || valid[i] {
+ if !arrow.IsStringHeaderInline(len(vv)) {
+ outOfLineTotal += len(vv)
+ }
+ }
+ }
+
+ b.ReserveData(outOfLineTotal)
+ for i, vv := range v {
+ if len(valid) == 0 || valid[i] {
+ hdr := &b.rawData[b.length+i]
+ hdr.SetBytes(vv)
+ if !hdr.IsInline() {
+ b.blockBuilder.UnsafeAppend(hdr, vv)
+ }
+ }
+ }
+
+ b.builder.unsafeAppendBoolsToBitmap(valid, len(v))
+}
+
+func (b *BinaryViewBuilder) AppendStringValues(v []string, valid []bool) {
+ if len(v) != len(valid) && len(valid) != 0 {
+ panic("len(v) != len(valid) && len(valid) != 0")
+ }
+
+ if len(v) == 0 {
+ return
+ }
+
+ b.Reserve(len(v))
+ outOfLineTotal := 0
+ for i, vv := range v {
+ if len(valid) == 0 || valid[i] {
+ if !arrow.IsStringHeaderInline(len(vv)) {
+ outOfLineTotal += len(vv)
+ }
+ }
+ }
+
+ b.ReserveData(outOfLineTotal)
+ for i, vv := range v {
+ if len(valid) == 0 || valid[i] {
+ hdr := &b.rawData[b.length+i]
+ hdr.SetString(vv)
+ if !hdr.IsInline() {
+ b.blockBuilder.UnsafeAppendString(hdr, vv)
+ }
+ }
+ }
+
+ b.builder.unsafeAppendBoolsToBitmap(valid, len(v))
+}
+
+// AppendValueFromString is paired with ValueStr for fulfilling the
+// base Builder interface. This is intended to read in a human-readable
+// string such as from CSV or JSON and append it to the array.
+//
+// For Binary values are expected to be base64 encoded (and will be
+// decoded as such before being appended).
+func (b *BinaryViewBuilder) AppendValueFromString(s string) error {
+ if s == NullValueStr {
+ b.AppendNull()
+ return nil
+ }
+
+ if b.dtype.IsUtf8() {
+ b.Append([]byte(s))
+ return nil
+ }
+
+ decodedVal, err := base64.StdEncoding.DecodeString(s)
+ if err != nil {
+ return fmt.Errorf("could not decode base64 string: %w", err)
+ }
+ b.Append(decodedVal)
+ return nil
+}
+
+func (b *BinaryViewBuilder) UnmarshalOne(dec *json.Decoder) error {
+ t, err := dec.Token()
+ if err != nil {
+ return err
+ }
+
+ switch v := t.(type) {
+ case string:
+ data, err := base64.StdEncoding.DecodeString(v)
+ if err != nil {
+ return err
+ }
+ b.Append(data)
+ case []byte:
+ b.Append(v)
+ case nil:
+ b.AppendNull()
+ default:
+ return &json.UnmarshalTypeError{
+ Value: fmt.Sprint(t),
+ Type: reflect.TypeOf([]byte{}),
+ Offset: dec.InputOffset(),
+ }
+ }
+ return nil
+}
+
+func (b *BinaryViewBuilder) Unmarshal(dec *json.Decoder) error {
+ for dec.More() {
+ if err := b.UnmarshalOne(dec); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (b *BinaryViewBuilder) UnmarshalJSON(data []byte) error {
+ dec := json.NewDecoder(bytes.NewReader(data))
+ t, err := dec.Token()
+ if err != nil {
+ return err
+ }
+
+ if delim, ok := t.(json.Delim); !ok || delim != '[' {
+ return fmt.Errorf("binary view builder must unpack from json
array, found %s", delim)
+ }
+
+ return b.Unmarshal(dec)
+}
+
+func (b *BinaryViewBuilder) newData() (data *Data) {
+ bytesRequired := arrow.StringHeaderTraits.BytesRequired(b.length)
+ if bytesRequired > 0 && bytesRequired < b.data.Len() {
+ // trim buffers
+ b.data.Resize(bytesRequired)
+ }
+
+ dataBuffers := b.blockBuilder.Finish()
+ data = NewData(b.dtype, b.length, append([]*memory.Buffer{
+ b.nullBitmap, b.data}, dataBuffers...), nil, b.nulls, 0)
+ b.reset()
+
+ if b.data != nil {
+ b.data.Release()
+ b.data = nil
Review Comment:
should `b.nullBitmap` be also released somewhere around here?
##########
go/arrow/internal/arrjson/arrjson.go:
##########
@@ -1024,6 +1033,30 @@ func arrayFromJSON(mem memory.Allocator, dt
arrow.DataType, arr Array) arrow.Arr
bldr.AppendValues(data, valids)
return returnNewArrayData(bldr)
+ case *arrow.BinaryViewType:
Review Comment:
can you use the interface instead? the 2 cases are identical
##########
go/arrow/internal/flatbuf/RecordBatch.go:
##########
@@ -128,8 +128,42 @@ func (rcv *RecordBatch) Compression(obj *BodyCompression)
*BodyCompression {
}
/// Optional compression of the message body
+/// Some types such as Utf8View are represented using a variable number of
buffers.
+/// For each such Field in the pre-ordered flattened logical schema, there
will be
+/// an entry in variadicCounts to indicate the number of extra buffers which
belong
+/// to that Field.
+func (rcv *RecordBatch) VariadicCounts(j int) int64 {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(12))
+ if o != 0 {
+ a := rcv._tab.Vector(o)
+ return rcv._tab.GetInt64(a + flatbuffers.UOffsetT(j*8))
+ }
+ return 0
Review Comment:
better to use exit-early:
```suggestion
if o == 0 {
return 0
}
return rcv._tab.GetInt64(rcv._tab.Vector(o) + flatbuffers.UOffsetT(j*8))
```
##########
go/arrow/internal/flatbuf/RecordBatch.go:
##########
@@ -128,8 +128,42 @@ func (rcv *RecordBatch) Compression(obj *BodyCompression)
*BodyCompression {
}
/// Optional compression of the message body
+/// Some types such as Utf8View are represented using a variable number of
buffers.
+/// For each such Field in the pre-ordered flattened logical schema, there
will be
+/// an entry in variadicCounts to indicate the number of extra buffers which
belong
+/// to that Field.
+func (rcv *RecordBatch) VariadicCounts(j int) int64 {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(12))
+ if o != 0 {
+ a := rcv._tab.Vector(o)
+ return rcv._tab.GetInt64(a + flatbuffers.UOffsetT(j*8))
+ }
+ return 0
+}
+
+func (rcv *RecordBatch) VariadicCountsLength() int {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(12))
+ if o != 0 {
+ return rcv._tab.VectorLen(o)
+ }
+ return 0
+}
+
+/// Some types such as Utf8View are represented using a variable number of
buffers.
+/// For each such Field in the pre-ordered flattened logical schema, there
will be
+/// an entry in variadicCounts to indicate the number of extra buffers which
belong
+/// to that Field.
+func (rcv *RecordBatch) MutateVariadicCounts(j int, n int64) bool {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(12))
+ if o != 0 {
+ a := rcv._tab.Vector(o)
+ return rcv._tab.MutateInt64(a+flatbuffers.UOffsetT(j*8), n)
+ }
+ return false
Review Comment:
exit early
```suggestion
if o == 0 {
return false
}
return
rcv._tab.MutateInt64(rcv._tab.Vector(o)+flatbuffers.UOffsetT(j*8), n)
```
##########
go/arrow/internal/arrjson/arrjson.go:
##########
@@ -2179,3 +2230,113 @@ func durationToJSON(arr *array.Duration) []interface{} {
}
return o
}
+
+func variadicBuffersFromJSON(bufs []string) []*memory.Buffer {
+ out := make([]*memory.Buffer, len(bufs))
+ for i, data := range bufs {
+ rawData, err := hex.DecodeString(data)
+ if err != nil {
+ panic(err)
+ }
+
+ out[i] = memory.NewBufferBytes(rawData)
+ }
+ return out
+}
+
+func variadicBuffersToJSON(bufs []*memory.Buffer) []string {
+ out := make([]string, len(bufs))
+ for i, data := range bufs {
+ out[i] = strings.ToUpper(hex.EncodeToString(data.Bytes()))
+ }
+ return out
+}
+
+func stringHeadersFromJSON(mem memory.Allocator, isBinary bool, data
[]interface{}) *memory.Buffer {
+ buf := memory.NewResizableBuffer(mem)
+ buf.Resize(arrow.StringHeaderTraits.BytesRequired(len(data)))
+
+ values := arrow.StringHeaderTraits.CastFromBytes(buf.Bytes())
+
+ for i, d := range data {
+ switch v := d.(type) {
+ case nil:
+ continue
+ case map[string]interface{}:
+ if inlined, ok := v["INLINED"]; ok {
+ if isBinary {
+ val, err :=
hex.DecodeString(inlined.(string))
+ if err != nil {
+ panic(fmt.Errorf("could not
decode %v: %v", inlined, err))
+ }
+ values[i].SetBytes(val)
+ } else {
+ values[i].SetString(inlined.(string))
+ }
+ continue
+ }
+
+ idx, offset := v["BUFFER_INDEX"].(json.Number),
v["OFFSET"].(json.Number)
+ bufIdx, err := idx.Int64()
+ if err != nil {
+ panic(err)
+ }
+
+ bufOffset, err := offset.Int64()
+ if err != nil {
+ panic(err)
+ }
+
+ values[i].SetIndexOffset(uint32(bufIdx),
uint32(bufOffset))
+ prefix, err := hex.DecodeString(v["PREFIX"].(string))
+ if err != nil {
+ panic(err)
+ }
+ sz, err := v["SIZE"].(json.Number).Int64()
+ if err != nil {
+ panic(err)
+ }
+
+ rawData := make([]byte, sz)
+ copy(rawData, prefix)
+ values[i].SetBytes(rawData)
+ }
+ }
+ return buf
+}
+
+func stringHeadersToJSON(arr array.ViewLike, isBinary bool) []interface{} {
+ type StringHeader struct {
+ Size int `json:"SIZE"`
+ Prefix *string `json:"PREFIX,omitempty"`
+ BufferIdx *int `json:"BUFFER_INDEX,omitempty"`
+ BufferOff *int `json:"OFFSET,omitempty"`
+ Inlined *string `json:"INLINED,omitempty"`
+ }
+
+ o := make([]interface{}, arr.Len())
+ for i := range o {
+ hdr := arr.ValueHeader(i)
+ if hdr.IsInline() {
+ data := hdr.InlineData()
+ if isBinary {
+ data =
strings.ToUpper(hex.EncodeToString(hdr.InlineBytes()))
+ }
+ o[i] = StringHeader{
+ Size: hdr.Len(),
+ Inlined: &data,
+ }
+ } else {
+ idx, off := int(hdr.BufferIndex()),
int(hdr.BufferOffset())
+ prefix := hdr.Prefix()
+ encodedPrefix :=
strings.ToUpper(hex.EncodeToString(prefix[:]))
Review Comment:
is `ToUpper` necessary?
##########
go/arrow/internal/arrjson/arrjson.go:
##########
@@ -1394,6 +1427,24 @@ func arrayToJSON(field arrow.Field, arr arrow.Array)
Array {
Offset: strOffsets,
}
+ case *array.StringView:
+ variadic := variadicBuffersToJSON(arr.Data().Buffers()[2:])
Review Comment:
same here (use interface, as these are the same cases)
##########
go/arrow/internal/arrjson/arrjson.go:
##########
@@ -2179,3 +2230,113 @@ func durationToJSON(arr *array.Duration) []interface{} {
}
return o
}
+
+func variadicBuffersFromJSON(bufs []string) []*memory.Buffer {
+ out := make([]*memory.Buffer, len(bufs))
+ for i, data := range bufs {
+ rawData, err := hex.DecodeString(data)
+ if err != nil {
+ panic(err)
+ }
+
+ out[i] = memory.NewBufferBytes(rawData)
+ }
+ return out
+}
+
+func variadicBuffersToJSON(bufs []*memory.Buffer) []string {
+ out := make([]string, len(bufs))
+ for i, data := range bufs {
+ out[i] = strings.ToUpper(hex.EncodeToString(data.Bytes()))
+ }
+ return out
+}
+
+func stringHeadersFromJSON(mem memory.Allocator, isBinary bool, data
[]interface{}) *memory.Buffer {
+ buf := memory.NewResizableBuffer(mem)
+ buf.Resize(arrow.StringHeaderTraits.BytesRequired(len(data)))
+
+ values := arrow.StringHeaderTraits.CastFromBytes(buf.Bytes())
+
+ for i, d := range data {
+ switch v := d.(type) {
+ case nil:
+ continue
+ case map[string]interface{}:
+ if inlined, ok := v["INLINED"]; ok {
+ if isBinary {
+ val, err :=
hex.DecodeString(inlined.(string))
+ if err != nil {
+ panic(fmt.Errorf("could not
decode %v: %v", inlined, err))
+ }
+ values[i].SetBytes(val)
+ } else {
+ values[i].SetString(inlined.(string))
+ }
+ continue
+ }
+
+ idx, offset := v["BUFFER_INDEX"].(json.Number),
v["OFFSET"].(json.Number)
+ bufIdx, err := idx.Int64()
+ if err != nil {
+ panic(err)
+ }
+
+ bufOffset, err := offset.Int64()
+ if err != nil {
+ panic(err)
+ }
+
+ values[i].SetIndexOffset(uint32(bufIdx),
uint32(bufOffset))
+ prefix, err := hex.DecodeString(v["PREFIX"].(string))
+ if err != nil {
+ panic(err)
+ }
+ sz, err := v["SIZE"].(json.Number).Int64()
+ if err != nil {
+ panic(err)
+ }
+
+ rawData := make([]byte, sz)
+ copy(rawData, prefix)
+ values[i].SetBytes(rawData)
+ }
+ }
+ return buf
+}
+
+func stringHeadersToJSON(arr array.ViewLike, isBinary bool) []interface{} {
+ type StringHeader struct {
+ Size int `json:"SIZE"`
+ Prefix *string `json:"PREFIX,omitempty"`
+ BufferIdx *int `json:"BUFFER_INDEX,omitempty"`
+ BufferOff *int `json:"OFFSET,omitempty"`
+ Inlined *string `json:"INLINED,omitempty"`
+ }
+
+ o := make([]interface{}, arr.Len())
+ for i := range o {
+ hdr := arr.ValueHeader(i)
+ if hdr.IsInline() {
+ data := hdr.InlineData()
+ if isBinary {
+ data =
strings.ToUpper(hex.EncodeToString(hdr.InlineBytes()))
+ }
+ o[i] = StringHeader{
+ Size: hdr.Len(),
+ Inlined: &data,
+ }
+ } else {
Review Comment:
can use `continue` here & ditch `else` nesting
##########
go/arrow/internal/arrjson/arrjson.go:
##########
@@ -2179,3 +2230,113 @@ func durationToJSON(arr *array.Duration) []interface{} {
}
return o
}
+
+func variadicBuffersFromJSON(bufs []string) []*memory.Buffer {
+ out := make([]*memory.Buffer, len(bufs))
+ for i, data := range bufs {
+ rawData, err := hex.DecodeString(data)
+ if err != nil {
+ panic(err)
+ }
+
+ out[i] = memory.NewBufferBytes(rawData)
+ }
+ return out
+}
+
+func variadicBuffersToJSON(bufs []*memory.Buffer) []string {
+ out := make([]string, len(bufs))
+ for i, data := range bufs {
+ out[i] = strings.ToUpper(hex.EncodeToString(data.Bytes()))
+ }
+ return out
+}
+
+func stringHeadersFromJSON(mem memory.Allocator, isBinary bool, data
[]interface{}) *memory.Buffer {
+ buf := memory.NewResizableBuffer(mem)
+ buf.Resize(arrow.StringHeaderTraits.BytesRequired(len(data)))
+
+ values := arrow.StringHeaderTraits.CastFromBytes(buf.Bytes())
+
+ for i, d := range data {
+ switch v := d.(type) {
+ case nil:
+ continue
+ case map[string]interface{}:
+ if inlined, ok := v["INLINED"]; ok {
+ if isBinary {
+ val, err :=
hex.DecodeString(inlined.(string))
+ if err != nil {
+ panic(fmt.Errorf("could not
decode %v: %v", inlined, err))
+ }
+ values[i].SetBytes(val)
+ } else {
+ values[i].SetString(inlined.(string))
+ }
+ continue
+ }
+
+ idx, offset := v["BUFFER_INDEX"].(json.Number),
v["OFFSET"].(json.Number)
+ bufIdx, err := idx.Int64()
+ if err != nil {
+ panic(err)
+ }
+
+ bufOffset, err := offset.Int64()
+ if err != nil {
+ panic(err)
+ }
+
+ values[i].SetIndexOffset(uint32(bufIdx),
uint32(bufOffset))
+ prefix, err := hex.DecodeString(v["PREFIX"].(string))
+ if err != nil {
+ panic(err)
+ }
+ sz, err := v["SIZE"].(json.Number).Int64()
+ if err != nil {
+ panic(err)
+ }
+
+ rawData := make([]byte, sz)
+ copy(rawData, prefix)
+ values[i].SetBytes(rawData)
+ }
+ }
+ return buf
+}
+
+func stringHeadersToJSON(arr array.ViewLike, isBinary bool) []interface{} {
+ type StringHeader struct {
+ Size int `json:"SIZE"`
+ Prefix *string `json:"PREFIX,omitempty"`
+ BufferIdx *int `json:"BUFFER_INDEX,omitempty"`
+ BufferOff *int `json:"OFFSET,omitempty"`
+ Inlined *string `json:"INLINED,omitempty"`
+ }
+
+ o := make([]interface{}, arr.Len())
+ for i := range o {
+ hdr := arr.ValueHeader(i)
+ if hdr.IsInline() {
+ data := hdr.InlineData()
+ if isBinary {
+ data =
strings.ToUpper(hex.EncodeToString(hdr.InlineBytes()))
Review Comment:
is `ToUpper` necessary?
##########
go/arrow/array/binarybuilder.go:
##########
@@ -370,6 +371,327 @@ func (b *BinaryBuilder) UnmarshalJSON(data []byte) error {
return b.Unmarshal(dec)
}
+const (
+ dfltBlockSize = 1 << 20 // 1 MB
+ viewValueSizeLimit uint32 = math.MaxUint32
+)
+
+type BinaryViewBuilder struct {
+ builder
+ dtype arrow.BinaryDataType
+
+ data *memory.Buffer
+ rawData []arrow.StringHeader
+
+ blockBuilder multiBufferBuilder
+}
+
+func NewBinaryViewBuilder(mem memory.Allocator) *BinaryViewBuilder {
+ return &BinaryViewBuilder{
+ dtype: arrow.BinaryTypes.BinaryView,
+ builder: builder{
+ refCount: 1,
+ mem: mem,
+ },
+ blockBuilder: multiBufferBuilder{
+ refCount: 1,
+ blockSize: dfltBlockSize,
+ mem: mem,
+ },
+ }
+}
+
+func (b *BinaryViewBuilder) Type() arrow.DataType { return b.dtype }
+
+func (b *BinaryViewBuilder) Release() {
+ debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+
+ if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.nullBitmap != nil {
+ b.nullBitmap.Release()
+ b.nullBitmap = nil
+ }
+ if b.data != nil {
+ b.data.Release()
+ b.data = nil
+ b.rawData = nil
+ }
+ }
Review Comment:
I do feel like `exit early` should be utilized heavily in the `Release()`
implementations, like so:
```suggestion
if atomic.AddInt64(&b.refCount, -1) != 0 {
return
}
if b.nullBitmap != nil {
b.nullBitmap.Release()
b.nullBitmap = nil
}
if b.data != nil {
b.data.Release()
b.data = nil
b.rawData = nil
}
```
##########
go/arrow/array/bufferbuilder.go:
##########
@@ -151,3 +153,115 @@ func (b *bufferBuilder) unsafeAppend(data []byte) {
copy(b.bytes[b.length:], data)
b.length += len(data)
}
+
+type multiBufferBuilder struct {
+ refCount int64
+ blockSize int
+
+ mem memory.Allocator
+ blocks []*memory.Buffer
+ currentOutBuffer int
+}
+
+// Retain increases the reference count by 1.
+// Retain may be called simultaneously from multiple goroutines.
+func (b *multiBufferBuilder) Retain() {
+ atomic.AddInt64(&b.refCount, 1)
+}
+
+// Release decreases the reference count by 1.
+// When the reference count goes to zero, the memory is freed.
+// Release may be called simultaneously from multiple goroutines.
+func (b *multiBufferBuilder) Release() {
+ debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+
+ if atomic.AddInt64(&b.refCount, -1) == 0 {
+ for i, buf := range b.blocks {
+ buf.Release()
+ b.blocks[i] = nil
+ }
+ }
Review Comment:
```suggestion
if atomic.AddInt64(&b.refCount, -1) == 0 {
b.Reset()
}
```
##########
go/arrow/array/binarybuilder.go:
##########
@@ -370,6 +371,327 @@ func (b *BinaryBuilder) UnmarshalJSON(data []byte) error {
return b.Unmarshal(dec)
}
+const (
+ dfltBlockSize = 1 << 20 // 1 MB
+ viewValueSizeLimit uint32 = math.MaxUint32
+)
+
+type BinaryViewBuilder struct {
+ builder
+ dtype arrow.BinaryDataType
+
+ data *memory.Buffer
+ rawData []arrow.StringHeader
+
+ blockBuilder multiBufferBuilder
+}
+
+func NewBinaryViewBuilder(mem memory.Allocator) *BinaryViewBuilder {
+ return &BinaryViewBuilder{
+ dtype: arrow.BinaryTypes.BinaryView,
+ builder: builder{
+ refCount: 1,
+ mem: mem,
+ },
+ blockBuilder: multiBufferBuilder{
+ refCount: 1,
+ blockSize: dfltBlockSize,
+ mem: mem,
+ },
+ }
+}
+
+func (b *BinaryViewBuilder) Type() arrow.DataType { return b.dtype }
+
+func (b *BinaryViewBuilder) Release() {
+ debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+
+ if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.nullBitmap != nil {
+ b.nullBitmap.Release()
+ b.nullBitmap = nil
+ }
+ if b.data != nil {
+ b.data.Release()
+ b.data = nil
+ b.rawData = nil
+ }
+ }
+}
+
+func (b *BinaryViewBuilder) init(capacity int) {
+ b.builder.init(capacity)
+ b.data = memory.NewResizableBuffer(b.mem)
+ bytesN := arrow.StringHeaderTraits.BytesRequired(capacity)
+ b.data.Resize(bytesN)
+ b.rawData = arrow.StringHeaderTraits.CastFromBytes(b.data.Bytes())
+}
+
+func (b *BinaryViewBuilder) Resize(n int) {
+ nbuild := n
+ if n < minBuilderCapacity {
+ n = minBuilderCapacity
+ }
+
+ if b.capacity == 0 {
+ b.init(n)
+ } else {
+ b.builder.resize(nbuild, b.init)
+ b.data.Resize(arrow.StringHeaderTraits.BytesRequired(n))
+ b.rawData =
arrow.StringHeaderTraits.CastFromBytes(b.data.Bytes())
+ }
Review Comment:
```suggestion
if b.capacity == 0 {
b.init(n)
return
}
b.builder.resize(nbuild, b.init)
b.data.Resize(arrow.StringHeaderTraits.BytesRequired(n))
b.rawData = arrow.StringHeaderTraits.CastFromBytes(b.data.Bytes())
```
##########
go/arrow/array/binarybuilder.go:
##########
@@ -370,6 +371,327 @@ func (b *BinaryBuilder) UnmarshalJSON(data []byte) error {
return b.Unmarshal(dec)
}
+const (
+ dfltBlockSize = 1 << 20 // 1 MB
+ viewValueSizeLimit uint32 = math.MaxUint32
+)
+
+type BinaryViewBuilder struct {
+ builder
+ dtype arrow.BinaryDataType
+
+ data *memory.Buffer
+ rawData []arrow.StringHeader
+
+ blockBuilder multiBufferBuilder
+}
+
+func NewBinaryViewBuilder(mem memory.Allocator) *BinaryViewBuilder {
+ return &BinaryViewBuilder{
+ dtype: arrow.BinaryTypes.BinaryView,
+ builder: builder{
+ refCount: 1,
+ mem: mem,
+ },
+ blockBuilder: multiBufferBuilder{
+ refCount: 1,
+ blockSize: dfltBlockSize,
+ mem: mem,
+ },
+ }
+}
+
+func (b *BinaryViewBuilder) Type() arrow.DataType { return b.dtype }
+
+func (b *BinaryViewBuilder) Release() {
+ debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+
+ if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.nullBitmap != nil {
+ b.nullBitmap.Release()
+ b.nullBitmap = nil
+ }
+ if b.data != nil {
+ b.data.Release()
+ b.data = nil
+ b.rawData = nil
+ }
+ }
+}
+
+func (b *BinaryViewBuilder) init(capacity int) {
+ b.builder.init(capacity)
+ b.data = memory.NewResizableBuffer(b.mem)
+ bytesN := arrow.StringHeaderTraits.BytesRequired(capacity)
+ b.data.Resize(bytesN)
+ b.rawData = arrow.StringHeaderTraits.CastFromBytes(b.data.Bytes())
+}
+
+func (b *BinaryViewBuilder) Resize(n int) {
+ nbuild := n
+ if n < minBuilderCapacity {
+ n = minBuilderCapacity
+ }
+
+ if b.capacity == 0 {
+ b.init(n)
+ } else {
+ b.builder.resize(nbuild, b.init)
+ b.data.Resize(arrow.StringHeaderTraits.BytesRequired(n))
+ b.rawData =
arrow.StringHeaderTraits.CastFromBytes(b.data.Bytes())
+ }
+}
+
+func (b *BinaryViewBuilder) ReserveData(length int) {
+ if uint32(length) > viewValueSizeLimit {
+ panic(fmt.Errorf("%w: BinaryView or StringView elements cannot
reference strings larger than 4GB",
+ arrow.ErrInvalid))
+ }
+ b.blockBuilder.Reserve(int(length))
+}
+
+func (b *BinaryViewBuilder) Reserve(n int) {
+ b.builder.reserve(n, b.Resize)
+}
+
+func (b *BinaryViewBuilder) Append(v []byte) {
+ if uint32(len(v)) > viewValueSizeLimit {
+ panic(fmt.Errorf("%w: BinaryView or StringView elements cannot
reference strings larger than 4GB", arrow.ErrInvalid))
+ }
+
+ if !arrow.IsStringHeaderInline(len(v)) {
+ b.ReserveData(len(v))
+ }
+
+ b.Reserve(1)
+ b.UnsafeAppend(v)
+}
+
+// AppendString is identical to Append, only accepting a string instead
+// of a byte slice, avoiding the extra copy that would occur if you simply
+// did []byte(v).
+//
+// This is different than AppendValueFromString which exists for the
+// Builder interface, in that this expects raw binary data which is
+// appended as such. AppendValueFromString expects base64 encoded binary
+// data instead.
+func (b *BinaryViewBuilder) AppendString(v string) {
+ // create a []byte without copying the bytes
+ // in go1.20 this would be unsafe.StringData
+ val := *(*[]byte)(unsafe.Pointer(&struct {
+ string
+ int
+ }{v, len(v)}))
+ b.Append(val)
+}
+
+func (b *BinaryViewBuilder) AppendNull() {
+ b.Reserve(1)
+ b.UnsafeAppendBoolToBitmap(false)
+}
+
+func (b *BinaryViewBuilder) AppendNulls(n int) {
+ b.Reserve(n)
+ for i := 0; i < n; i++ {
+ b.UnsafeAppendBoolToBitmap(false)
+ }
+}
+
+func (b *BinaryViewBuilder) AppendEmptyValue() {
+ b.Reserve(1)
+ b.UnsafeAppendBoolToBitmap(true)
+}
+
+func (b *BinaryViewBuilder) AppendEmptyValues(n int) {
+ b.Reserve(n)
+ b.unsafeAppendBoolsToBitmap(nil, n)
+}
+
+func (b *BinaryViewBuilder) UnsafeAppend(v []byte) {
+ hdr := &b.rawData[b.length]
+ hdr.SetBytes(v)
+ if !hdr.IsInline() {
+ b.blockBuilder.UnsafeAppend(hdr, v)
+ }
+ b.UnsafeAppendBoolToBitmap(true)
+}
+
+func (b *BinaryViewBuilder) AppendValues(v [][]byte, valid []bool) {
+ if len(v) != len(valid) && len(valid) != 0 {
+ panic("len(v) != len(valid) && len(valid) != 0")
+ }
+
+ if len(v) == 0 {
+ return
+ }
+
+ b.Reserve(len(v))
+ outOfLineTotal := 0
+ for i, vv := range v {
+ if len(valid) == 0 || valid[i] {
+ if !arrow.IsStringHeaderInline(len(vv)) {
+ outOfLineTotal += len(vv)
+ }
+ }
+ }
+
+ b.ReserveData(outOfLineTotal)
+ for i, vv := range v {
+ if len(valid) == 0 || valid[i] {
+ hdr := &b.rawData[b.length+i]
+ hdr.SetBytes(vv)
+ if !hdr.IsInline() {
+ b.blockBuilder.UnsafeAppend(hdr, vv)
+ }
+ }
+ }
+
+ b.builder.unsafeAppendBoolsToBitmap(valid, len(v))
+}
+
+func (b *BinaryViewBuilder) AppendStringValues(v []string, valid []bool) {
+ if len(v) != len(valid) && len(valid) != 0 {
+ panic("len(v) != len(valid) && len(valid) != 0")
+ }
+
+ if len(v) == 0 {
+ return
+ }
+
+ b.Reserve(len(v))
+ outOfLineTotal := 0
+ for i, vv := range v {
+ if len(valid) == 0 || valid[i] {
+ if !arrow.IsStringHeaderInline(len(vv)) {
+ outOfLineTotal += len(vv)
+ }
+ }
+ }
+
+ b.ReserveData(outOfLineTotal)
+ for i, vv := range v {
+ if len(valid) == 0 || valid[i] {
+ hdr := &b.rawData[b.length+i]
+ hdr.SetString(vv)
+ if !hdr.IsInline() {
+ b.blockBuilder.UnsafeAppendString(hdr, vv)
+ }
+ }
+ }
+
+ b.builder.unsafeAppendBoolsToBitmap(valid, len(v))
+}
+
+// AppendValueFromString is paired with ValueStr for fulfilling the
+// base Builder interface. This is intended to read in a human-readable
+// string such as from CSV or JSON and append it to the array.
+//
+// For Binary values are expected to be base64 encoded (and will be
+// decoded as such before being appended).
+func (b *BinaryViewBuilder) AppendValueFromString(s string) error {
+ if s == NullValueStr {
+ b.AppendNull()
+ return nil
+ }
+
+ if b.dtype.IsUtf8() {
+ b.Append([]byte(s))
+ return nil
+ }
+
+ decodedVal, err := base64.StdEncoding.DecodeString(s)
+ if err != nil {
+ return fmt.Errorf("could not decode base64 string: %w", err)
+ }
+ b.Append(decodedVal)
+ return nil
+}
+
+func (b *BinaryViewBuilder) UnmarshalOne(dec *json.Decoder) error {
+ t, err := dec.Token()
+ if err != nil {
+ return err
+ }
+
+ switch v := t.(type) {
+ case string:
+ data, err := base64.StdEncoding.DecodeString(v)
+ if err != nil {
+ return err
+ }
+ b.Append(data)
+ case []byte:
+ b.Append(v)
+ case nil:
+ b.AppendNull()
+ default:
+ return &json.UnmarshalTypeError{
+ Value: fmt.Sprint(t),
+ Type: reflect.TypeOf([]byte{}),
+ Offset: dec.InputOffset(),
+ }
+ }
+ return nil
+}
+
+func (b *BinaryViewBuilder) Unmarshal(dec *json.Decoder) error {
+ for dec.More() {
+ if err := b.UnmarshalOne(dec); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (b *BinaryViewBuilder) UnmarshalJSON(data []byte) error {
+ dec := json.NewDecoder(bytes.NewReader(data))
+ t, err := dec.Token()
+ if err != nil {
+ return err
+ }
+
+ if delim, ok := t.(json.Delim); !ok || delim != '[' {
+ return fmt.Errorf("binary view builder must unpack from json
array, found %s", delim)
+ }
+
+ return b.Unmarshal(dec)
+}
+
+func (b *BinaryViewBuilder) newData() (data *Data) {
+ bytesRequired := arrow.StringHeaderTraits.BytesRequired(b.length)
+ if bytesRequired > 0 && bytesRequired < b.data.Len() {
+ // trim buffers
+ b.data.Resize(bytesRequired)
+ }
+
+ dataBuffers := b.blockBuilder.Finish()
+ data = NewData(b.dtype, b.length, append([]*memory.Buffer{
+ b.nullBitmap, b.data}, dataBuffers...), nil, b.nulls, 0)
+ b.reset()
+
+ if b.data != nil {
+ b.data.Release()
+ b.data = nil
+ b.rawData = nil
+ for _, buf := range dataBuffers {
+ buf.Release()
+ }
+ }
+ return
+}
+
+func (b *BinaryViewBuilder) NewBinaryViewArray() (a *BinaryView) {
+ data := b.newData()
+ a = NewBinaryViewData(data)
+ data.Release()
+ return
Review Comment:
```suggestion
data := b.newData()
defer data.Release()
return NewBinaryViewData(data)
```
?
##########
go/arrow/datatype_stringheader.go:
##########
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package arrow
+
+import (
+ "bytes"
+ "unsafe"
+
+ "github.com/apache/arrow/go/v13/arrow/endian"
+ "github.com/apache/arrow/go/v13/arrow/internal/debug"
+ "github.com/apache/arrow/go/v13/arrow/memory"
+)
+
+const (
+ StringHeaderPrefixLen = 4
+ stringHeaderInlineSize = 12
+)
+
+func IsStringHeaderInline(length int) bool {
+ return length < stringHeaderInlineSize
+}
+
+// StringHeader is a variable length string (utf8) or byte slice with
+// a 4 byte prefix and inline optimization for small values (12 bytes
+// or fewer). This is similar to Go's standard string but limited by
Review Comment:
```suggestion
// or fewer). This is similar to Go's standard string but limited by
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]