This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push:
new 2b72399 fix: move from atomic.(Add|Load|Store) to atomic.Int64{}
(#326)
2b72399 is described below
commit 2b72399254989b62f09fd66a1a6c2c38863b418d
Author: Chris Pahl <[email protected]>
AuthorDate: Fri Apr 4 17:58:54 2025 +0200
fix: move from atomic.(Add|Load|Store) to atomic.Int64{} (#326)
### Rationale for this change
This is a follow-up to https://github.com/apache/arrow-go/pull/323.
ARM 32-bit requires atomic operations to be 32-bit aligned. This has not
been the case in a rather large amount of cases in the code base.
Failing to align causes crashes during runtime.
### What changes are included in this PR?
I replaced all uses of `atomic.LoadInt64`, `atomic.StoreInt64` and
`atomic.AddInt64` with `atomic.Int64{}` and its methods. This type has
built-in alignment, so it does not matter in which order struct fields
appear, which makes it generally harder to screw up alignment during
changes.
### Are these changes tested?
Briefly on the 32-bit architecture with the same program mentioned in
#323. The best way to move forward here would be to also add a 32bit CI
environment to make sure `arrow` builds and tests there. But I guess
this is out of scope for this PR.
I've noticed that `go vet ./...` produces many warnings over the
project, most of them already there before this PR. The new ones are all
of the kind:
```
arrow/array/dictionary.go:614:42: literal copies lock value from bldr:
github.com/apache/arrow-go/v18/arrow/array.dictionaryBuilder contains
github.com/apache/arrow-go/v18/arrow/array.builder contains sync/atomic.Int64
contains sync/atomic.noCopy
```
I.e. false positives due to initialization in another struct and then
copying it in a constructor function.
### Are there any user-facing changes?
In the hope that everything was done right: no.
---
arrow/Makefile | 2 +-
arrow/array/array.go | 12 +-
arrow/array/binary.go | 7 +-
arrow/array/binarybuilder.go | 28 +-
arrow/array/boolean.go | 2 +-
arrow/array/booleanbuilder.go | 13 +-
arrow/array/bufferbuilder.go | 16 +-
arrow/array/bufferbuilder_byte.go | 4 +-
arrow/array/bufferbuilder_numeric.gen.go | 12 +-
arrow/array/bufferbuilder_numeric.gen.go.tmpl | 4 +-
arrow/array/builder.go | 4 +-
arrow/array/concat.go | 4 +-
arrow/array/data.go | 15 +-
arrow/array/decimal.go | 25 +-
arrow/array/dictionary.go | 30 +-
arrow/array/encoded.go | 13 +-
arrow/array/extension.go | 2 +-
arrow/array/fixed_size_list.go | 21 +-
arrow/array/fixedsize_binary.go | 8 +-
arrow/array/fixedsize_binarybuilder.go | 12 +-
arrow/array/float16.go | 2 +-
arrow/array/float16_builder.go | 9 +-
arrow/array/interval.go | 31 +-
arrow/array/json_reader.go | 23 +-
arrow/array/list.go | 73 +++--
arrow/array/map.go | 2 +-
arrow/array/null.go | 13 +-
arrow/array/numeric_generic.go | 2 +-
arrow/array/numericbuilder.gen.go | 121 +++++---
arrow/array/numericbuilder.gen.go.tmpl | 12 +-
arrow/array/numericbuilder.gen_test.go | 315 +++++++++++++++++++++
arrow/array/numericbuilder.gen_test.go.tmpl | 25 ++
arrow/array/record.go | 49 ++--
arrow/array/string.go | 6 +-
arrow/array/struct.go | 11 +-
arrow/array/table.go | 48 ++--
arrow/array/timestamp.go | 11 +-
arrow/array/union.go | 17 +-
arrow/avro/reader.go | 11 +-
arrow/cdata/cdata.go | 9 +-
arrow/cdata/import_allocator.go | 8 +-
arrow/compute/exec/utils.go | 9 +-
arrow/csv/reader.go | 20 +-
arrow/flight/flightsql/example/sql_batch_reader.go | 37 +--
.../example/sqlite_tables_schema_batch_reader.go | 15 +-
arrow/flight/record_batch_reader.go | 11 +-
arrow/internal/arrjson/reader.go | 14 +-
arrow/ipc/message.go | 44 +--
arrow/memory/buffer.go | 22 +-
arrow/memory/checked_allocator.go | 22 +-
arrow/table.go | 28 +-
arrow/tensor/tensor.go | 22 +-
arrow/tmpl | Bin 0 -> 6565444 bytes
parquet/file/record_reader.go | 50 ++--
parquet/pqarrow/column_readers.go | 46 +--
parquet/pqarrow/file_reader.go | 20 +-
56 files changed, 924 insertions(+), 468 deletions(-)
diff --git a/arrow/Makefile b/arrow/Makefile
index 9c4a232..c7e3270 100644
--- a/arrow/Makefile
+++ b/arrow/Makefile
@@ -30,7 +30,7 @@ assembly:
@$(MAKE) -C math assembly
generate: bin/tmpl
- bin/tmpl -i -data=numeric.tmpldata type_traits_numeric.gen.go.tmpl
type_traits_numeric.gen_test.go.tmpl array/numeric.gen.go.tmpl
array/numericbuilder.gen_test.go.tmpl array/numericbuilder.gen.go.tmpl
array/bufferbuilder_numeric.gen.go.tmpl
+ bin/tmpl -i -data=numeric.tmpldata type_traits_numeric.gen.go.tmpl
type_traits_numeric.gen_test.go.tmpl array/numericbuilder.gen_test.go.tmpl
array/numericbuilder.gen.go.tmpl array/bufferbuilder_numeric.gen.go.tmpl
bin/tmpl -i -data=datatype_numeric.gen.go.tmpldata
datatype_numeric.gen.go.tmpl
@$(MAKE) -C math generate
diff --git a/arrow/array/array.go b/arrow/array/array.go
index 6e281a4..947b44f 100644
--- a/arrow/array/array.go
+++ b/arrow/array/array.go
@@ -35,7 +35,7 @@ const (
)
type array struct {
- refCount int64
+ refCount atomic.Int64
data *Data
nullBitmapBytes []byte
}
@@ -43,16 +43,16 @@ type array struct {
// Retain increases the reference count by 1.
// Retain may be called simultaneously from multiple goroutines.
func (a *array) Retain() {
- atomic.AddInt64(&a.refCount, 1)
+ a.refCount.Add(1)
}
// Release decreases the reference count by 1.
// Release may be called simultaneously from multiple goroutines.
// When the reference count goes to zero, the memory is freed.
func (a *array) Release() {
- debug.Assert(atomic.LoadInt64(&a.refCount) > 0, "too many releases")
+ debug.Assert(a.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&a.refCount, -1) == 0 {
+ if a.refCount.Add(-1) == 0 {
a.data.Release()
a.data, a.nullBitmapBytes = nil, nil
}
@@ -109,9 +109,7 @@ func (a *array) Offset() int {
type arrayConstructorFn func(arrow.ArrayData) arrow.Array
-var (
- makeArrayFn [64]arrayConstructorFn
-)
+var makeArrayFn [64]arrayConstructorFn
func invalidDataType(data arrow.ArrayData) arrow.Array {
panic("invalid data type: " + data.DataType().ID().String())
diff --git a/arrow/array/binary.go b/arrow/array/binary.go
index ab53c8a..5fef60e 100644
--- a/arrow/array/binary.go
+++ b/arrow/array/binary.go
@@ -45,7 +45,7 @@ type Binary struct {
// NewBinaryData constructs a new Binary array from data.
func NewBinaryData(data arrow.ArrayData) *Binary {
a := &Binary{}
- a.refCount = 1
+ a.refCount.Add(1)
a.setData(data.(*Data))
return a
}
@@ -189,7 +189,7 @@ type LargeBinary struct {
func NewLargeBinaryData(data arrow.ArrayData) *LargeBinary {
a := &LargeBinary{}
- a.refCount = 1
+ a.refCount.Add(1)
a.setData(data.(*Data))
return a
}
@@ -208,6 +208,7 @@ func (a *LargeBinary) ValueStr(i int) string {
}
return base64.StdEncoding.EncodeToString(a.Value(i))
}
+
func (a *LargeBinary) ValueString(i int) string {
b := a.Value(i)
return *(*string)(unsafe.Pointer(&b))
@@ -333,7 +334,7 @@ type BinaryView struct {
func NewBinaryViewData(data arrow.ArrayData) *BinaryView {
a := &BinaryView{}
- a.refCount = 1
+ a.refCount.Add(1)
a.setData(data.(*Data))
return a
}
diff --git a/arrow/array/binarybuilder.go b/arrow/array/binarybuilder.go
index 794ac68..8b162c7 100644
--- a/arrow/array/binarybuilder.go
+++ b/arrow/array/binarybuilder.go
@@ -22,7 +22,6 @@ import (
"fmt"
"math"
"reflect"
- "sync/atomic"
"unsafe"
"github.com/apache/arrow-go/v18/arrow"
@@ -72,8 +71,8 @@ func NewBinaryBuilder(mem memory.Allocator, dtype
arrow.BinaryDataType) *BinaryB
offsetByteWidth = arrow.Int64SizeBytes
}
- b := &BinaryBuilder{
- builder: builder{refCount: 1, mem: mem},
+ bb := &BinaryBuilder{
+ builder: builder{mem: mem},
dtype: dtype,
offsets: offsets,
values: newByteBufferBuilder(mem),
@@ -82,7 +81,8 @@ func NewBinaryBuilder(mem memory.Allocator, dtype
arrow.BinaryDataType) *BinaryB
offsetByteWidth: offsetByteWidth,
getOffsetVal: getOffsetVal,
}
- return b
+ bb.builder.refCount.Add(1)
+ return bb
}
func (b *BinaryBuilder) Type() arrow.DataType { return b.dtype }
@@ -91,9 +91,9 @@ func (b *BinaryBuilder) Type() arrow.DataType { return
b.dtype }
// When the reference count goes to zero, the memory is freed.
// Release may be called simultaneously from multiple goroutines.
func (b *BinaryBuilder) Release() {
- debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+ debug.Assert(b.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.refCount.Add(-1) == 0 {
if b.nullBitmap != nil {
b.nullBitmap.Release()
b.nullBitmap = nil
@@ -387,18 +387,19 @@ type BinaryViewBuilder struct {
}
func NewBinaryViewBuilder(mem memory.Allocator) *BinaryViewBuilder {
- return &BinaryViewBuilder{
+ bvb := &BinaryViewBuilder{
dtype: arrow.BinaryTypes.BinaryView,
builder: builder{
- refCount: 1,
- mem: mem,
+ mem: mem,
},
blockBuilder: multiBufferBuilder{
- refCount: 1,
blockSize: dfltBlockSize,
mem: mem,
},
}
+ bvb.builder.refCount.Add(1)
+ bvb.blockBuilder.refCount.Add(1)
+ return bvb
}
func (b *BinaryViewBuilder) SetBlockSize(sz uint) {
@@ -408,9 +409,9 @@ func (b *BinaryViewBuilder) SetBlockSize(sz uint) {
func (b *BinaryViewBuilder) Type() arrow.DataType { return b.dtype }
func (b *BinaryViewBuilder) Release() {
- debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+ debug.Assert(b.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&b.refCount, -1) != 0 {
+ if b.refCount.Add(-1) != 0 {
return
}
@@ -673,7 +674,8 @@ func (b *BinaryViewBuilder) newData() (data *Data) {
dataBuffers := b.blockBuilder.Finish()
data = NewData(b.dtype, b.length, append([]*memory.Buffer{
- b.nullBitmap, b.data}, dataBuffers...), nil, b.nulls, 0)
+ b.nullBitmap, b.data,
+ }, dataBuffers...), nil, b.nulls, 0)
b.reset()
if b.data != nil {
diff --git a/arrow/array/boolean.go b/arrow/array/boolean.go
index e610996..1b28a9f 100644
--- a/arrow/array/boolean.go
+++ b/arrow/array/boolean.go
@@ -44,7 +44,7 @@ func NewBoolean(length int, data *memory.Buffer, nullBitmap
*memory.Buffer, null
func NewBooleanData(data arrow.ArrayData) *Boolean {
a := &Boolean{}
- a.refCount = 1
+ a.refCount.Add(1)
a.setData(data.(*Data))
return a
}
diff --git a/arrow/array/booleanbuilder.go b/arrow/array/booleanbuilder.go
index 951fe3a..a277ffd 100644
--- a/arrow/array/booleanbuilder.go
+++ b/arrow/array/booleanbuilder.go
@@ -21,7 +21,6 @@ import (
"fmt"
"reflect"
"strconv"
- "sync/atomic"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/bitutil"
@@ -38,7 +37,9 @@ type BooleanBuilder struct {
}
func NewBooleanBuilder(mem memory.Allocator) *BooleanBuilder {
- return &BooleanBuilder{builder: builder{refCount: 1, mem: mem}}
+ bb := &BooleanBuilder{builder: builder{mem: mem}}
+ bb.builder.refCount.Add(1)
+ return bb
}
func (b *BooleanBuilder) Type() arrow.DataType { return
arrow.FixedWidthTypes.Boolean }
@@ -47,9 +48,9 @@ func (b *BooleanBuilder) Type() arrow.DataType { return
arrow.FixedWidthTypes.Bo
// When the reference count goes to zero, the memory is freed.
// Release may be called simultaneously from multiple goroutines.
func (b *BooleanBuilder) Release() {
- debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+ debug.Assert(b.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.refCount.Add(-1) == 0 {
if b.nullBitmap != nil {
b.nullBitmap.Release()
b.nullBitmap = nil
@@ -258,6 +259,4 @@ func (b *BooleanBuilder) Value(i int) bool {
return bitutil.BitIsSet(b.rawData, i)
}
-var (
- _ Builder = (*BooleanBuilder)(nil)
-)
+var _ Builder = (*BooleanBuilder)(nil)
diff --git a/arrow/array/bufferbuilder.go b/arrow/array/bufferbuilder.go
index 085d43e..bc784d6 100644
--- a/arrow/array/bufferbuilder.go
+++ b/arrow/array/bufferbuilder.go
@@ -43,7 +43,7 @@ type bufBuilder interface {
// A bufferBuilder provides common functionality for populating memory with a
sequence of type-specific values.
// Specialized implementations provide type-safe APIs for appending and
accessing the memory.
type bufferBuilder struct {
- refCount int64
+ refCount atomic.Int64
mem memory.Allocator
buffer *memory.Buffer
length int
@@ -55,16 +55,16 @@ type bufferBuilder struct {
// Retain increases the reference count by 1.
// Retain may be called simultaneously from multiple goroutines.
func (b *bufferBuilder) Retain() {
- atomic.AddInt64(&b.refCount, 1)
+ b.refCount.Add(1)
}
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
// Release may be called simultaneously from multiple goroutines.
func (b *bufferBuilder) Release() {
- debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+ debug.Assert(b.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.refCount.Add(-1) == 0 {
if b.buffer != nil {
b.buffer.Release()
b.buffer, b.bytes = nil, nil
@@ -155,7 +155,7 @@ func (b *bufferBuilder) unsafeAppend(data []byte) {
}
type multiBufferBuilder struct {
- refCount int64
+ refCount atomic.Int64
blockSize int
mem memory.Allocator
@@ -166,16 +166,16 @@ type multiBufferBuilder struct {
// Retain increases the reference count by 1.
// Retain may be called simultaneously from multiple goroutines.
func (b *multiBufferBuilder) Retain() {
- atomic.AddInt64(&b.refCount, 1)
+ b.refCount.Add(1)
}
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
// Release may be called simultaneously from multiple goroutines.
func (b *multiBufferBuilder) Release() {
- debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+ debug.Assert(b.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.refCount.Add(-1) == 0 {
b.Reset()
}
}
diff --git a/arrow/array/bufferbuilder_byte.go
b/arrow/array/bufferbuilder_byte.go
index 78bb938..61431b7 100644
--- a/arrow/array/bufferbuilder_byte.go
+++ b/arrow/array/bufferbuilder_byte.go
@@ -23,7 +23,9 @@ type byteBufferBuilder struct {
}
func newByteBufferBuilder(mem memory.Allocator) *byteBufferBuilder {
- return &byteBufferBuilder{bufferBuilder: bufferBuilder{refCount: 1,
mem: mem}}
+ bbb := &byteBufferBuilder{bufferBuilder: bufferBuilder{mem: mem}}
+ bbb.bufferBuilder.refCount.Add(1)
+ return bbb
}
func (b *byteBufferBuilder) Values() []byte { return b.Bytes() }
diff --git a/arrow/array/bufferbuilder_numeric.gen.go
b/arrow/array/bufferbuilder_numeric.gen.go
index 3812c5e..e887fbf 100644
--- a/arrow/array/bufferbuilder_numeric.gen.go
+++ b/arrow/array/bufferbuilder_numeric.gen.go
@@ -29,7 +29,9 @@ type int64BufferBuilder struct {
}
func newInt64BufferBuilder(mem memory.Allocator) *int64BufferBuilder {
- return &int64BufferBuilder{bufferBuilder: bufferBuilder{refCount: 1,
mem: mem}}
+ b := &int64BufferBuilder{bufferBuilder: bufferBuilder{mem: mem}}
+ b.refCount.Add(1)
+ return b
}
// AppendValues appends the contents of v to the buffer, growing the buffer as
needed.
@@ -62,7 +64,9 @@ type int32BufferBuilder struct {
}
func newInt32BufferBuilder(mem memory.Allocator) *int32BufferBuilder {
- return &int32BufferBuilder{bufferBuilder: bufferBuilder{refCount: 1,
mem: mem}}
+ b := &int32BufferBuilder{bufferBuilder: bufferBuilder{mem: mem}}
+ b.refCount.Add(1)
+ return b
}
// AppendValues appends the contents of v to the buffer, growing the buffer as
needed.
@@ -95,7 +99,9 @@ type int8BufferBuilder struct {
}
func newInt8BufferBuilder(mem memory.Allocator) *int8BufferBuilder {
- return &int8BufferBuilder{bufferBuilder: bufferBuilder{refCount: 1,
mem: mem}}
+ b := &int8BufferBuilder{bufferBuilder: bufferBuilder{mem: mem}}
+ b.refCount.Add(1)
+ return b
}
// AppendValues appends the contents of v to the buffer, growing the buffer as
needed.
diff --git a/arrow/array/bufferbuilder_numeric.gen.go.tmpl
b/arrow/array/bufferbuilder_numeric.gen.go.tmpl
index c3c39de..3582057 100644
--- a/arrow/array/bufferbuilder_numeric.gen.go.tmpl
+++ b/arrow/array/bufferbuilder_numeric.gen.go.tmpl
@@ -30,7 +30,9 @@ type {{$TypeNamePrefix}}BufferBuilder struct {
}
func new{{.Name}}BufferBuilder(mem memory.Allocator)
*{{$TypeNamePrefix}}BufferBuilder {
- return
&{{$TypeNamePrefix}}BufferBuilder{bufferBuilder:bufferBuilder{refCount: 1,
mem:mem}}
+ b :=
&{{$TypeNamePrefix}}BufferBuilder{bufferBuilder:bufferBuilder{mem:mem}}
+ b.refCount.Add(1)
+ return b
}
// AppendValues appends the contents of v to the buffer, growing the buffer as
needed.
diff --git a/arrow/array/builder.go b/arrow/array/builder.go
index f5b5ee4..0b3a4e9 100644
--- a/arrow/array/builder.go
+++ b/arrow/array/builder.go
@@ -102,7 +102,7 @@ type Builder interface {
// builder provides common functionality for managing the validity bitmap
(nulls) when building arrays.
type builder struct {
- refCount int64
+ refCount atomic.Int64
mem memory.Allocator
nullBitmap *memory.Buffer
nulls int
@@ -113,7 +113,7 @@ type builder struct {
// Retain increases the reference count by 1.
// Retain may be called simultaneously from multiple goroutines.
func (b *builder) Retain() {
- atomic.AddInt64(&b.refCount, 1)
+ b.refCount.Add(1)
}
// Len returns the number of elements in the array builder.
diff --git a/arrow/array/concat.go b/arrow/array/concat.go
index bb50354..8f6aefb 100644
--- a/arrow/array/concat.go
+++ b/arrow/array/concat.go
@@ -517,7 +517,9 @@ func concatListView(data []arrow.ArrayData, offsetType
arrow.FixedWidthDataType,
// concat is the implementation for actually performing the concatenation of
the arrow.ArrayData
// objects that we can call internally for nested types.
func concat(data []arrow.ArrayData, mem memory.Allocator) (arr
arrow.ArrayData, err error) {
- out := &Data{refCount: 1, dtype: data[0].DataType(), nulls: 0}
+ out := &Data{dtype: data[0].DataType(), nulls: 0}
+ out.refCount.Add(1)
+
defer func() {
if pErr := recover(); pErr != nil {
err = utils.FormatRecoveredError("arrow/concat", pErr)
diff --git a/arrow/array/data.go b/arrow/array/data.go
index be75c7c..62284b3 100644
--- a/arrow/array/data.go
+++ b/arrow/array/data.go
@@ -29,7 +29,7 @@ import (
// Data represents the memory and metadata of an Arrow array.
type Data struct {
- refCount int64
+ refCount atomic.Int64
dtype arrow.DataType
nulls int
offset int
@@ -56,8 +56,7 @@ func NewData(dtype arrow.DataType, length int, buffers
[]*memory.Buffer, childDa
}
}
- return &Data{
- refCount: 1,
+ d := &Data{
dtype: dtype,
nulls: nulls,
length: length,
@@ -65,6 +64,8 @@ func NewData(dtype arrow.DataType, length int, buffers
[]*memory.Buffer, childDa
buffers: buffers,
childData: childData,
}
+ d.refCount.Add(1)
+ return d
}
// NewDataWithDictionary creates a new data object, but also sets the provided
dictionary into the data if it's not nil
@@ -129,16 +130,16 @@ func (d *Data) Reset(dtype arrow.DataType, length int,
buffers []*memory.Buffer,
// Retain increases the reference count by 1.
// Retain may be called simultaneously from multiple goroutines.
func (d *Data) Retain() {
- atomic.AddInt64(&d.refCount, 1)
+ d.refCount.Add(1)
}
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
// Release may be called simultaneously from multiple goroutines.
func (d *Data) Release() {
- debug.Assert(atomic.LoadInt64(&d.refCount) > 0, "too many releases")
+ debug.Assert(d.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&d.refCount, -1) == 0 {
+ if d.refCount.Add(-1) == 0 {
for _, b := range d.buffers {
if b != nil {
b.Release()
@@ -246,7 +247,6 @@ func NewSliceData(data arrow.ArrayData, i, j int64)
arrow.ArrayData {
}
o := &Data{
- refCount: 1,
dtype: data.DataType(),
nulls: UnknownNullCount,
length: int(j - i),
@@ -255,6 +255,7 @@ func NewSliceData(data arrow.ArrayData, i, j int64)
arrow.ArrayData {
childData: data.Children(),
dictionary: data.(*Data).dictionary,
}
+ o.refCount.Add(1)
if data.NullN() == 0 {
o.nulls = 0
diff --git a/arrow/array/decimal.go b/arrow/array/decimal.go
index 8bbbc1a..dff0fea 100644
--- a/arrow/array/decimal.go
+++ b/arrow/array/decimal.go
@@ -21,7 +21,6 @@ import (
"fmt"
"reflect"
"strings"
- "sync/atomic"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/bitutil"
@@ -45,7 +44,7 @@ func newDecimalData[T interface {
decimal.Num[T]
}](data arrow.ArrayData) *baseDecimal[T] {
a := &baseDecimal[T]{}
- a.refCount = 1
+ a.refCount.Add(1)
a.setData(data.(*Data))
return a
}
@@ -148,11 +147,13 @@ func NewDecimal256Data(data arrow.ArrayData) *Decimal256 {
return newDecimalData[decimal.Decimal256](data)
}
-type Decimal32Builder = baseDecimalBuilder[decimal.Decimal32]
-type Decimal64Builder = baseDecimalBuilder[decimal.Decimal64]
-type Decimal128Builder struct {
- *baseDecimalBuilder[decimal.Decimal128]
-}
+type (
+ Decimal32Builder = baseDecimalBuilder[decimal.Decimal32]
+ Decimal64Builder = baseDecimalBuilder[decimal.Decimal64]
+ Decimal128Builder struct {
+ *baseDecimalBuilder[decimal.Decimal128]
+ }
+)
func (b *Decimal128Builder) NewDecimal128Array() *Decimal128 {
return b.NewDecimalArray()
@@ -182,18 +183,20 @@ func newDecimalBuilder[T interface {
decimal.DecimalTypes
decimal.Num[T]
}, DT arrow.DecimalType](mem memory.Allocator, dtype DT)
*baseDecimalBuilder[T] {
- return &baseDecimalBuilder[T]{
- builder: builder{refCount: 1, mem: mem},
+ bdb := &baseDecimalBuilder[T]{
+ builder: builder{mem: mem},
dtype: dtype,
}
+ bdb.builder.refCount.Add(1)
+ return bdb
}
func (b *baseDecimalBuilder[T]) Type() arrow.DataType { return b.dtype }
func (b *baseDecimalBuilder[T]) Release() {
- debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+ debug.Assert(b.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.refCount.Add(-1) == 0 {
if b.nullBitmap != nil {
b.nullBitmap.Release()
b.nullBitmap = nil
diff --git a/arrow/array/dictionary.go b/arrow/array/dictionary.go
index 0c23934..88df0cb 100644
--- a/arrow/array/dictionary.go
+++ b/arrow/array/dictionary.go
@@ -22,7 +22,6 @@ import (
"fmt"
"math"
"math/bits"
- "sync/atomic"
"unsafe"
"github.com/apache/arrow-go/v18/arrow"
@@ -66,7 +65,7 @@ type Dictionary struct {
// and dictionary using the given type.
func NewDictionaryArray(typ arrow.DataType, indices, dict arrow.Array)
*Dictionary {
a := &Dictionary{}
- a.array.refCount = 1
+ a.array.refCount.Add(1)
dictdata := NewData(typ, indices.Len(), indices.Data().Buffers(),
indices.Data().Children(), indices.NullN(), indices.Data().Offset())
dictdata.dictionary = dict.Data().(*Data)
dict.Data().Retain()
@@ -188,19 +187,19 @@ func NewValidatedDictionaryArray(typ
*arrow.DictionaryType, indices, dict arrow.
// an ArrayData object with a datatype of arrow.Dictionary and a dictionary
func NewDictionaryData(data arrow.ArrayData) *Dictionary {
a := &Dictionary{}
- a.refCount = 1
+ a.refCount.Add(1)
a.setData(data.(*Data))
return a
}
func (d *Dictionary) Retain() {
- atomic.AddInt64(&d.refCount, 1)
+ d.refCount.Add(1)
}
func (d *Dictionary) Release() {
- debug.Assert(atomic.LoadInt64(&d.refCount) > 0, "too many releases")
+ debug.Assert(d.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&d.refCount, -1) == 0 {
+ if d.refCount.Add(-1) == 0 {
d.data.Release()
d.data, d.nullBitmapBytes = nil, nil
d.indices.Release()
@@ -444,12 +443,14 @@ func NewDictionaryBuilderWithDict(mem memory.Allocator,
dt *arrow.DictionaryType
}
bldr := dictionaryBuilder{
- builder: builder{refCount: 1, mem: mem},
+ builder: builder{mem: mem},
idxBuilder: idxbldr,
memoTable: memo,
dt: dt,
}
+ bldr.builder.refCount.Add(1)
+
switch dt.ValueType.ID() {
case arrow.NULL:
ret := &NullDictionaryBuilder{bldr}
@@ -696,9 +697,9 @@ func NewDictionaryBuilder(mem memory.Allocator, dt
*arrow.DictionaryType) Dictio
func (b *dictionaryBuilder) Type() arrow.DataType { return b.dt }
func (b *dictionaryBuilder) Release() {
- debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+ debug.Assert(b.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.refCount.Add(-1) == 0 {
b.idxBuilder.Release()
b.idxBuilder.Builder = nil
if binmemo, ok := b.memoTable.(*hashing.BinaryMemoTable); ok {
@@ -820,7 +821,7 @@ func (b *dictionaryBuilder) newData() *Data {
func (b *dictionaryBuilder) NewDictionaryArray() *Dictionary {
a := &Dictionary{}
- a.refCount = 1
+ a.refCount.Add(1)
indices := b.newData()
a.setData(indices)
@@ -1274,6 +1275,7 @@ type MonthIntervalDictionaryBuilder struct {
func (b *MonthIntervalDictionaryBuilder) Append(v arrow.MonthInterval) error {
return b.appendValue(int32(v))
}
+
func (b *MonthIntervalDictionaryBuilder) InsertDictValues(arr *MonthInterval)
(err error) {
for _, v := range arr.values {
if err = b.insertDictValue(int32(v)); err != nil {
@@ -1351,6 +1353,7 @@ func (b *BinaryDictionaryBuilder) InsertDictValues(arr
*Binary) (err error) {
}
return
}
+
func (b *BinaryDictionaryBuilder) InsertStringDictValues(arr *String) (err
error) {
if !arrow.TypeEqual(arr.DataType(), b.dt.ValueType) {
return fmt.Errorf("dictionary insert type mismatch: cannot
insert values of type %T to dictionary type %T", arr.DataType(), b.dt.ValueType)
@@ -1407,6 +1410,7 @@ type FixedSizeBinaryDictionaryBuilder struct {
func (b *FixedSizeBinaryDictionaryBuilder) Append(v []byte) error {
return b.appendValue(v[:b.byteWidth])
}
+
func (b *FixedSizeBinaryDictionaryBuilder) InsertDictValues(arr
*FixedSizeBinary) (err error) {
var (
beg = arr.array.data.offset * b.byteWidth
@@ -1429,6 +1433,7 @@ type Decimal32DictionaryBuilder struct {
func (b *Decimal32DictionaryBuilder) Append(v decimal.Decimal32) error {
return
b.appendValue((*(*[arrow.Decimal32SizeBytes]byte)(unsafe.Pointer(&v)))[:])
}
+
func (b *Decimal32DictionaryBuilder) InsertDictValues(arr *Decimal32) (err
error) {
data := arrow.Decimal32Traits.CastToBytes(arr.values)
for len(data) > 0 {
@@ -1447,6 +1452,7 @@ type Decimal64DictionaryBuilder struct {
func (b *Decimal64DictionaryBuilder) Append(v decimal.Decimal64) error {
return
b.appendValue((*(*[arrow.Decimal64SizeBytes]byte)(unsafe.Pointer(&v)))[:])
}
+
func (b *Decimal64DictionaryBuilder) InsertDictValues(arr *Decimal64) (err
error) {
data := arrow.Decimal64Traits.CastToBytes(arr.values)
for len(data) > 0 {
@@ -1465,6 +1471,7 @@ type Decimal128DictionaryBuilder struct {
func (b *Decimal128DictionaryBuilder) Append(v decimal128.Num) error {
return
b.appendValue((*(*[arrow.Decimal128SizeBytes]byte)(unsafe.Pointer(&v)))[:])
}
+
func (b *Decimal128DictionaryBuilder) InsertDictValues(arr *Decimal128) (err
error) {
data := arrow.Decimal128Traits.CastToBytes(arr.values)
for len(data) > 0 {
@@ -1483,6 +1490,7 @@ type Decimal256DictionaryBuilder struct {
func (b *Decimal256DictionaryBuilder) Append(v decimal256.Num) error {
return
b.appendValue((*(*[arrow.Decimal256SizeBytes]byte)(unsafe.Pointer(&v)))[:])
}
+
func (b *Decimal256DictionaryBuilder) InsertDictValues(arr *Decimal256) (err
error) {
data := arrow.Decimal256Traits.CastToBytes(arr.values)
for len(data) > 0 {
@@ -1501,6 +1509,7 @@ type MonthDayNanoDictionaryBuilder struct {
func (b *MonthDayNanoDictionaryBuilder) Append(v arrow.MonthDayNanoInterval)
error {
return
b.appendValue((*(*[arrow.MonthDayNanoIntervalSizeBytes]byte)(unsafe.Pointer(&v)))[:])
}
+
func (b *MonthDayNanoDictionaryBuilder) InsertDictValues(arr
*MonthDayNanoInterval) (err error) {
data := arrow.MonthDayNanoIntervalTraits.CastToBytes(arr.values)
for len(data) > 0 {
@@ -1519,6 +1528,7 @@ type DayTimeDictionaryBuilder struct {
func (b *DayTimeDictionaryBuilder) Append(v arrow.DayTimeInterval) error {
return
b.appendValue((*(*[arrow.DayTimeIntervalSizeBytes]byte)(unsafe.Pointer(&v)))[:])
}
+
func (b *DayTimeDictionaryBuilder) InsertDictValues(arr *DayTimeInterval) (err
error) {
data := arrow.DayTimeIntervalTraits.CastToBytes(arr.values)
for len(data) > 0 {
diff --git a/arrow/array/encoded.go b/arrow/array/encoded.go
index 81c375c..8e39090 100644
--- a/arrow/array/encoded.go
+++ b/arrow/array/encoded.go
@@ -21,7 +21,6 @@ import (
"fmt"
"math"
"reflect"
- "sync/atomic"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/encoded"
@@ -50,7 +49,7 @@ func NewRunEndEncodedArray(runEnds, values arrow.Array,
logicalLength, offset in
func NewRunEndEncodedData(data arrow.ArrayData) *RunEndEncoded {
r := &RunEndEncoded{}
- r.refCount = 1
+ r.refCount.Add(1)
r.setData(data.(*Data))
return r
}
@@ -305,14 +304,16 @@ func NewRunEndEncodedBuilder(mem memory.Allocator,
runEnds, encoded arrow.DataTy
case arrow.INT64:
maxEnd = math.MaxInt64
}
- return &RunEndEncodedBuilder{
- builder: builder{refCount: 1, mem: mem},
+ reb := &RunEndEncodedBuilder{
+ builder: builder{mem: mem},
dt: dt,
runEnds: NewBuilder(mem, runEnds),
values: NewBuilder(mem, encoded),
maxRunEnd: maxEnd,
lastUnmarshalled: nil,
}
+ reb.builder.refCount.Add(1)
+ return reb
}
func (b *RunEndEncodedBuilder) Type() arrow.DataType {
@@ -320,9 +321,9 @@ func (b *RunEndEncodedBuilder) Type() arrow.DataType {
}
func (b *RunEndEncodedBuilder) Release() {
- debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+ debug.Assert(b.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.refCount.Add(-1) == 0 {
b.values.Release()
b.runEnds.Release()
}
diff --git a/arrow/array/extension.go b/arrow/array/extension.go
index d1a2835..4177303 100644
--- a/arrow/array/extension.go
+++ b/arrow/array/extension.go
@@ -86,7 +86,7 @@ func NewExtensionArrayWithStorage(dt arrow.ExtensionType,
storage arrow.Array) a
// underlying data built for the storage array.
func NewExtensionData(data arrow.ArrayData) ExtensionArray {
base := ExtensionArrayBase{}
- base.refCount = 1
+ base.refCount.Add(1)
base.setData(data.(*Data))
// use the ExtensionType's ArrayType to construct the correctly typed
object
diff --git a/arrow/array/fixed_size_list.go b/arrow/array/fixed_size_list.go
index 84036f9..4a0524e 100644
--- a/arrow/array/fixed_size_list.go
+++ b/arrow/array/fixed_size_list.go
@@ -20,7 +20,6 @@ import (
"bytes"
"fmt"
"strings"
- "sync/atomic"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/bitutil"
@@ -41,7 +40,7 @@ var _ ListLike = (*FixedSizeList)(nil)
// NewFixedSizeListData returns a new List array value, from data.
func NewFixedSizeListData(data arrow.ArrayData) *FixedSizeList {
a := &FixedSizeList{}
- a.refCount = 1
+ a.refCount.Add(1)
a.setData(data.(*Data))
return a
}
@@ -54,6 +53,7 @@ func (a *FixedSizeList) ValueStr(i int) string {
}
return string(a.GetOneForMarshal(i).(json.RawMessage))
}
+
func (a *FixedSizeList) String() string {
o := new(strings.Builder)
o.WriteString("[")
@@ -169,28 +169,33 @@ type FixedSizeListBuilder struct {
// NewFixedSizeListBuilder returns a builder, using the provided memory
allocator.
// The created list builder will create a list whose elements will be of type
etype.
func NewFixedSizeListBuilder(mem memory.Allocator, n int32, etype
arrow.DataType) *FixedSizeListBuilder {
- return &FixedSizeListBuilder{
+ fslb := &FixedSizeListBuilder{
baseListBuilder{
- builder: builder{refCount: 1, mem: mem},
+ builder: builder{mem: mem},
values: NewBuilder(mem, etype),
dt: arrow.FixedSizeListOf(n, etype),
},
n,
}
+ fslb.baseListBuilder.builder.refCount.Add(1)
+ return fslb
}
// NewFixedSizeListBuilderWithField returns a builder similarly to
// NewFixedSizeListBuilder, but it accepts a child rather than just a datatype
// to ensure nullability context is preserved.
func NewFixedSizeListBuilderWithField(mem memory.Allocator, n int32, field
arrow.Field) *FixedSizeListBuilder {
- return &FixedSizeListBuilder{
+ fslb := &FixedSizeListBuilder{
baseListBuilder{
- builder: builder{refCount: 1, mem: mem},
+ builder: builder{mem: mem},
values: NewBuilder(mem, field.Type),
dt: arrow.FixedSizeListOfField(n, field),
},
n,
}
+
+ fslb.baseListBuilder.builder.refCount.Add(1)
+ return fslb
}
func (b *FixedSizeListBuilder) Type() arrow.DataType { return b.dt }
@@ -198,9 +203,9 @@ func (b *FixedSizeListBuilder) Type() arrow.DataType {
return b.dt }
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
func (b *FixedSizeListBuilder) Release() {
- debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+ debug.Assert(b.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.refCount.Add(-1) == 0 {
if b.nullBitmap != nil {
b.nullBitmap.Release()
b.nullBitmap = nil
diff --git a/arrow/array/fixedsize_binary.go b/arrow/array/fixedsize_binary.go
index 7049c9c..a3b0380 100644
--- a/arrow/array/fixedsize_binary.go
+++ b/arrow/array/fixedsize_binary.go
@@ -37,7 +37,7 @@ type FixedSizeBinary struct {
// NewFixedSizeBinaryData constructs a new fixed-size binary array from data.
func NewFixedSizeBinaryData(data arrow.ArrayData) *FixedSizeBinary {
a := &FixedSizeBinary{bytewidth:
int32(data.DataType().(arrow.FixedWidthDataType).BitWidth() / 8)}
- a.refCount = 1
+ a.refCount.Add(1)
a.setData(data.(*Data))
return a
}
@@ -52,6 +52,7 @@ func (a *FixedSizeBinary) Value(i int) []byte {
)
return a.valueBytes[beg:end]
}
+
func (a *FixedSizeBinary) ValueStr(i int) string {
if a.IsNull(i) {
return NullValueStr
@@ -83,7 +84,6 @@ func (a *FixedSizeBinary) setData(data *Data) {
if vals != nil {
a.valueBytes = vals.Bytes()
}
-
}
func (a *FixedSizeBinary) GetOneForMarshal(i int) interface{} {
@@ -118,6 +118,4 @@ func arrayEqualFixedSizeBinary(left, right
*FixedSizeBinary) bool {
return true
}
-var (
- _ arrow.Array = (*FixedSizeBinary)(nil)
-)
+var _ arrow.Array = (*FixedSizeBinary)(nil)
diff --git a/arrow/array/fixedsize_binarybuilder.go
b/arrow/array/fixedsize_binarybuilder.go
index 02e72a2..ee7869f 100644
--- a/arrow/array/fixedsize_binarybuilder.go
+++ b/arrow/array/fixedsize_binarybuilder.go
@@ -21,7 +21,6 @@ import (
"encoding/base64"
"fmt"
"reflect"
- "sync/atomic"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/internal/debug"
@@ -39,10 +38,11 @@ type FixedSizeBinaryBuilder struct {
func NewFixedSizeBinaryBuilder(mem memory.Allocator, dtype
*arrow.FixedSizeBinaryType) *FixedSizeBinaryBuilder {
b := &FixedSizeBinaryBuilder{
- builder: builder{refCount: 1, mem: mem},
+ builder: builder{mem: mem},
dtype: dtype,
values: newByteBufferBuilder(mem),
}
+ b.builder.refCount.Add(1)
return b
}
@@ -52,9 +52,9 @@ func (b *FixedSizeBinaryBuilder) Type() arrow.DataType {
return b.dtype }
// When the reference count goes to zero, the memory is freed.
// Release may be called simultaneously from multiple goroutines.
func (b *FixedSizeBinaryBuilder) Release() {
- debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+ debug.Assert(b.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.refCount.Add(-1) == 0 {
if b.nullBitmap != nil {
b.nullBitmap.Release()
b.nullBitmap = nil
@@ -256,6 +256,4 @@ func (b *FixedSizeBinaryBuilder) UnmarshalJSON(data []byte)
error {
return b.Unmarshal(dec)
}
-var (
- _ Builder = (*FixedSizeBinaryBuilder)(nil)
-)
+var _ Builder = (*FixedSizeBinaryBuilder)(nil)
diff --git a/arrow/array/float16.go b/arrow/array/float16.go
index a472cfa..5f57f72 100644
--- a/arrow/array/float16.go
+++ b/arrow/array/float16.go
@@ -33,7 +33,7 @@ type Float16 struct {
func NewFloat16Data(data arrow.ArrayData) *Float16 {
a := &Float16{}
- a.refCount = 1
+ a.refCount.Add(1)
a.setData(data.(*Data))
return a
}
diff --git a/arrow/array/float16_builder.go b/arrow/array/float16_builder.go
index 93dbfbc..d4acd7f 100644
--- a/arrow/array/float16_builder.go
+++ b/arrow/array/float16_builder.go
@@ -21,7 +21,6 @@ import (
"fmt"
"reflect"
"strconv"
- "sync/atomic"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/bitutil"
@@ -39,7 +38,9 @@ type Float16Builder struct {
}
func NewFloat16Builder(mem memory.Allocator) *Float16Builder {
- return &Float16Builder{builder: builder{refCount: 1, mem: mem}}
+ fb := &Float16Builder{builder: builder{mem: mem}}
+ fb.refCount.Add(1)
+ return fb
}
func (b *Float16Builder) Type() arrow.DataType { return
arrow.FixedWidthTypes.Float16 }
@@ -47,9 +48,9 @@ func (b *Float16Builder) Type() arrow.DataType { return
arrow.FixedWidthTypes.Fl
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
func (b *Float16Builder) Release() {
- debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+ debug.Assert(b.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.refCount.Add(-1) == 0 {
if b.nullBitmap != nil {
b.nullBitmap.Release()
b.nullBitmap = nil
diff --git a/arrow/array/interval.go b/arrow/array/interval.go
index a9c52d3..1f16d71 100644
--- a/arrow/array/interval.go
+++ b/arrow/array/interval.go
@@ -21,7 +21,6 @@ import (
"fmt"
"strconv"
"strings"
- "sync/atomic"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/bitutil"
@@ -51,7 +50,7 @@ type MonthInterval struct {
func NewMonthIntervalData(data arrow.ArrayData) *MonthInterval {
a := &MonthInterval{}
- a.refCount = 1
+ a.refCount.Add(1)
a.setData(data.(*Data))
return a
}
@@ -140,7 +139,9 @@ type MonthIntervalBuilder struct {
}
func NewMonthIntervalBuilder(mem memory.Allocator) *MonthIntervalBuilder {
- return &MonthIntervalBuilder{builder: builder{refCount: 1, mem: mem}}
+ mib := &MonthIntervalBuilder{builder: builder{mem: mem}}
+ mib.refCount.Add(1)
+ return mib
}
func (b *MonthIntervalBuilder) Type() arrow.DataType { return
arrow.FixedWidthTypes.MonthInterval }
@@ -148,9 +149,9 @@ func (b *MonthIntervalBuilder) Type() arrow.DataType {
return arrow.FixedWidthTy
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
func (b *MonthIntervalBuilder) Release() {
- debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+ debug.Assert(b.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.refCount.Add(-1) == 0 {
if b.nullBitmap != nil {
b.nullBitmap.Release()
b.nullBitmap = nil
@@ -348,7 +349,7 @@ type DayTimeInterval struct {
func NewDayTimeIntervalData(data arrow.ArrayData) *DayTimeInterval {
a := &DayTimeInterval{}
- a.refCount = 1
+ a.refCount.Add(1)
a.setData(data.(*Data))
return a
}
@@ -440,7 +441,9 @@ type DayTimeIntervalBuilder struct {
}
func NewDayTimeIntervalBuilder(mem memory.Allocator) *DayTimeIntervalBuilder {
- return &DayTimeIntervalBuilder{builder: builder{refCount: 1, mem: mem}}
+ dtb := &DayTimeIntervalBuilder{builder: builder{mem: mem}}
+ dtb.refCount.Add(1)
+ return dtb
}
func (b *DayTimeIntervalBuilder) Type() arrow.DataType { return
arrow.FixedWidthTypes.DayTimeInterval }
@@ -448,9 +451,9 @@ func (b *DayTimeIntervalBuilder) Type() arrow.DataType {
return arrow.FixedWidth
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
func (b *DayTimeIntervalBuilder) Release() {
- debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+ debug.Assert(b.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.refCount.Add(-1) == 0 {
if b.nullBitmap != nil {
b.nullBitmap.Release()
b.nullBitmap = nil
@@ -647,7 +650,7 @@ type MonthDayNanoInterval struct {
func NewMonthDayNanoIntervalData(data arrow.ArrayData) *MonthDayNanoInterval {
a := &MonthDayNanoInterval{}
- a.refCount = 1
+ a.refCount.Add(1)
a.setData(data.(*Data))
return a
}
@@ -741,7 +744,9 @@ type MonthDayNanoIntervalBuilder struct {
}
func NewMonthDayNanoIntervalBuilder(mem memory.Allocator)
*MonthDayNanoIntervalBuilder {
- return &MonthDayNanoIntervalBuilder{builder: builder{refCount: 1, mem:
mem}}
+ mb := &MonthDayNanoIntervalBuilder{builder: builder{mem: mem}}
+ mb.refCount.Add(1)
+ return mb
}
func (b *MonthDayNanoIntervalBuilder) Type() arrow.DataType {
@@ -751,9 +756,9 @@ func (b *MonthDayNanoIntervalBuilder) Type() arrow.DataType
{
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
func (b *MonthDayNanoIntervalBuilder) Release() {
- debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+ debug.Assert(b.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.refCount.Add(-1) == 0 {
if b.nullBitmap != nil {
b.nullBitmap.Release()
b.nullBitmap = nil
diff --git a/arrow/array/json_reader.go b/arrow/array/json_reader.go
index 7835b28..b0698b3 100644
--- a/arrow/array/json_reader.go
+++ b/arrow/array/json_reader.go
@@ -28,8 +28,10 @@ import (
"github.com/apache/arrow-go/v18/internal/json"
)
-type Option func(config)
-type config interface{}
+type (
+ Option func(config)
+ config interface{}
+)
// WithChunk sets the chunk size for reading in json records. The default is to
// read in one row per record batch as a single object. If chunk size is set to
@@ -72,7 +74,7 @@ type JSONReader struct {
bldr *RecordBuilder
- refs int64
+ refs atomic.Int64
cur arrow.Record
err error
@@ -93,9 +95,10 @@ func NewJSONReader(r io.Reader, schema *arrow.Schema, opts
...Option) *JSONReade
rr := &JSONReader{
r: json.NewDecoder(r),
schema: schema,
- refs: 1,
chunk: 1,
}
+ rr.refs.Add(1)
+
for _, o := range opts {
o(rr)
}
@@ -126,13 +129,13 @@ func (r *JSONReader) Schema() *arrow.Schema { return
r.schema }
func (r *JSONReader) Record() arrow.Record { return r.cur }
func (r *JSONReader) Retain() {
- atomic.AddInt64(&r.refs, 1)
+ r.refs.Add(1)
}
func (r *JSONReader) Release() {
- debug.Assert(atomic.LoadInt64(&r.refs) > 0, "too many releases")
+ debug.Assert(r.refs.Load() > 0, "too many releases")
- if atomic.AddInt64(&r.refs, -1) == 0 {
+ if r.refs.Add(-1) == 0 {
if r.cur != nil {
r.cur.Release()
r.bldr.Release()
@@ -186,7 +189,7 @@ func (r *JSONReader) next1() bool {
}
func (r *JSONReader) nextn() bool {
- var n = 0
+ n := 0
for i := 0; i < r.chunk && !r.done; i, n = i+1, n+1 {
if !r.readNext() {
@@ -200,6 +203,4 @@ func (r *JSONReader) nextn() bool {
return n > 0
}
-var (
- _ RecordReader = (*JSONReader)(nil)
-)
+var _ RecordReader = (*JSONReader)(nil)
diff --git a/arrow/array/list.go b/arrow/array/list.go
index e80bc89..806b89c 100644
--- a/arrow/array/list.go
+++ b/arrow/array/list.go
@@ -20,7 +20,6 @@ import (
"bytes"
"fmt"
"strings"
- "sync/atomic"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/bitutil"
@@ -51,7 +50,7 @@ var _ ListLike = (*List)(nil)
// NewListData returns a new List array value, from data.
func NewListData(data arrow.ArrayData) *List {
a := &List{}
- a.refCount = 1
+ a.refCount.Add(1)
a.setData(data.(*Data))
return a
}
@@ -183,7 +182,7 @@ var _ ListLike = (*LargeList)(nil)
// NewLargeListData returns a new LargeList array value, from data.
func NewLargeListData(data arrow.ArrayData) *LargeList {
a := new(LargeList)
- a.refCount = 1
+ a.refCount.Add(1)
a.setData(data.(*Data))
return a
}
@@ -337,30 +336,34 @@ type LargeListBuilder struct {
// The created list builder will create a list whose elements will be of type
etype.
func NewListBuilder(mem memory.Allocator, etype arrow.DataType) *ListBuilder {
offsetBldr := NewInt32Builder(mem)
- return &ListBuilder{
+ lb := &ListBuilder{
baseListBuilder{
- builder: builder{refCount: 1, mem: mem},
+ builder: builder{mem: mem},
values: NewBuilder(mem, etype),
offsets: offsetBldr,
dt: arrow.ListOf(etype),
appendOffsetVal: func(o int) {
offsetBldr.Append(int32(o)) },
},
}
+ lb.refCount.Add(1)
+ return lb
}
// NewListBuilderWithField takes a field to use for the child rather than just
// a datatype to allow for more customization.
func NewListBuilderWithField(mem memory.Allocator, field arrow.Field)
*ListBuilder {
offsetBldr := NewInt32Builder(mem)
- return &ListBuilder{
+ lb := &ListBuilder{
baseListBuilder{
- builder: builder{refCount: 1, mem: mem},
+ builder: builder{mem: mem},
values: NewBuilder(mem, field.Type),
offsets: offsetBldr,
dt: arrow.ListOfField(field),
appendOffsetVal: func(o int) {
offsetBldr.Append(int32(o)) },
},
}
+ lb.refCount.Add(1)
+ return lb
}
func (b *baseListBuilder) Type() arrow.DataType {
@@ -381,38 +384,42 @@ func (b *baseListBuilder) Type() arrow.DataType {
// The created list builder will create a list whose elements will be of type
etype.
func NewLargeListBuilder(mem memory.Allocator, etype arrow.DataType)
*LargeListBuilder {
offsetBldr := NewInt64Builder(mem)
- return &LargeListBuilder{
+ llb := &LargeListBuilder{
baseListBuilder{
- builder: builder{refCount: 1, mem: mem},
+ builder: builder{mem: mem},
values: NewBuilder(mem, etype),
offsets: offsetBldr,
dt: arrow.LargeListOf(etype),
appendOffsetVal: func(o int) {
offsetBldr.Append(int64(o)) },
},
}
+ llb.refCount.Add(1)
+ return llb
}
// NewLargeListBuilderWithField takes a field rather than just an element type
// to allow for more customization of the final type of the LargeList Array
func NewLargeListBuilderWithField(mem memory.Allocator, field arrow.Field)
*LargeListBuilder {
offsetBldr := NewInt64Builder(mem)
- return &LargeListBuilder{
+ llb := &LargeListBuilder{
baseListBuilder{
- builder: builder{refCount: 1, mem: mem},
+ builder: builder{mem: mem},
values: NewBuilder(mem, field.Type),
offsets: offsetBldr,
dt: arrow.LargeListOfField(field),
appendOffsetVal: func(o int) {
offsetBldr.Append(int64(o)) },
},
}
+ llb.refCount.Add(1)
+ return llb
}
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
func (b *baseListBuilder) Release() {
- debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+ debug.Assert(b.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.refCount.Add(-1) == 0 {
if b.nullBitmap != nil {
b.nullBitmap.Release()
b.nullBitmap = nil
@@ -420,7 +427,6 @@ func (b *baseListBuilder) Release() {
b.values.Release()
b.offsets.Release()
}
-
}
func (b *baseListBuilder) appendNextOffset() {
@@ -646,7 +652,7 @@ var _ VarLenListLike = (*ListView)(nil)
func NewListViewData(data arrow.ArrayData) *ListView {
a := &ListView{}
- a.refCount = 1
+ a.refCount.Add(1)
a.setData(data.(*Data))
return a
}
@@ -793,7 +799,7 @@ var _ VarLenListLike = (*LargeListView)(nil)
// NewLargeListViewData returns a new LargeListView array value, from data.
func NewLargeListViewData(data arrow.ArrayData) *LargeListView {
a := new(LargeListView)
- a.refCount = 1
+ a.refCount.Add(1)
a.setData(data.(*Data))
return a
}
@@ -931,8 +937,10 @@ type offsetsAndSizes interface {
sizeAt(slot int64) int64
}
-var _ offsetsAndSizes = (*ListView)(nil)
-var _ offsetsAndSizes = (*LargeListView)(nil)
+var (
+ _ offsetsAndSizes = (*ListView)(nil)
+ _ offsetsAndSizes = (*LargeListView)(nil)
+)
func (a *ListView) offsetAt(slot int64) int64 { return
int64(a.offsets[int64(a.data.offset)+slot]) }
@@ -1081,9 +1089,9 @@ type LargeListViewBuilder struct {
func NewListViewBuilder(mem memory.Allocator, etype arrow.DataType)
*ListViewBuilder {
offsetBldr := NewInt32Builder(mem)
sizeBldr := NewInt32Builder(mem)
- return &ListViewBuilder{
+ lvb := &ListViewBuilder{
baseListViewBuilder{
- builder: builder{refCount: 1, mem: mem},
+ builder: builder{mem: mem},
values: NewBuilder(mem, etype),
offsets: offsetBldr,
sizes: sizeBldr,
@@ -1092,6 +1100,8 @@ func NewListViewBuilder(mem memory.Allocator, etype
arrow.DataType) *ListViewBui
appendSizeVal: func(s int) {
sizeBldr.Append(int32(s)) },
},
}
+ lvb.refCount.Add(1)
+ return lvb
}
// NewListViewBuilderWithField takes a field to use for the child rather than
just
@@ -1099,9 +1109,9 @@ func NewListViewBuilder(mem memory.Allocator, etype
arrow.DataType) *ListViewBui
func NewListViewBuilderWithField(mem memory.Allocator, field arrow.Field)
*ListViewBuilder {
offsetBldr := NewInt32Builder(mem)
sizeBldr := NewInt32Builder(mem)
- return &ListViewBuilder{
+ lvb := &ListViewBuilder{
baseListViewBuilder{
- builder: builder{refCount: 1, mem: mem},
+ builder: builder{mem: mem},
values: NewBuilder(mem, field.Type),
offsets: offsetBldr,
sizes: sizeBldr,
@@ -1110,6 +1120,8 @@ func NewListViewBuilderWithField(mem memory.Allocator,
field arrow.Field) *ListV
appendSizeVal: func(s int) {
sizeBldr.Append(int32(s)) },
},
}
+ lvb.refCount.Add(1)
+ return lvb
}
func (b *baseListViewBuilder) Type() arrow.DataType {
@@ -1131,9 +1143,9 @@ func (b *baseListViewBuilder) Type() arrow.DataType {
func NewLargeListViewBuilder(mem memory.Allocator, etype arrow.DataType)
*LargeListViewBuilder {
offsetBldr := NewInt64Builder(mem)
sizeBldr := NewInt64Builder(mem)
- return &LargeListViewBuilder{
+ llvb := &LargeListViewBuilder{
baseListViewBuilder{
- builder: builder{refCount: 1, mem: mem},
+ builder: builder{mem: mem},
values: NewBuilder(mem, etype),
offsets: offsetBldr,
sizes: sizeBldr,
@@ -1142,6 +1154,8 @@ func NewLargeListViewBuilder(mem memory.Allocator, etype
arrow.DataType) *LargeL
appendSizeVal: func(s int) {
sizeBldr.Append(int64(s)) },
},
}
+ llvb.refCount.Add(1)
+ return llvb
}
// NewLargeListViewBuilderWithField takes a field rather than just an element
type
@@ -1149,9 +1163,9 @@ func NewLargeListViewBuilder(mem memory.Allocator, etype
arrow.DataType) *LargeL
func NewLargeListViewBuilderWithField(mem memory.Allocator, field arrow.Field)
*LargeListViewBuilder {
offsetBldr := NewInt64Builder(mem)
sizeBldr := NewInt64Builder(mem)
- return &LargeListViewBuilder{
+ llvb := &LargeListViewBuilder{
baseListViewBuilder{
- builder: builder{refCount: 1, mem: mem},
+ builder: builder{mem: mem},
values: NewBuilder(mem, field.Type),
offsets: offsetBldr,
sizes: sizeBldr,
@@ -1160,14 +1174,17 @@ func NewLargeListViewBuilderWithField(mem
memory.Allocator, field arrow.Field) *
appendSizeVal: func(o int) {
sizeBldr.Append(int64(o)) },
},
}
+
+ llvb.refCount.Add(1)
+ return llvb
}
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
func (b *baseListViewBuilder) Release() {
- debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+ debug.Assert(b.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.refCount.Add(-1) == 0 {
if b.nullBitmap != nil {
b.nullBitmap.Release()
b.nullBitmap = nil
diff --git a/arrow/array/map.go b/arrow/array/map.go
index 5609ccd..da9a150 100644
--- a/arrow/array/map.go
+++ b/arrow/array/map.go
@@ -37,7 +37,7 @@ var _ ListLike = (*Map)(nil)
// NewMapData returns a new Map array value, from data
func NewMapData(data arrow.ArrayData) *Map {
a := &Map{List: &List{}}
- a.refCount = 1
+ a.refCount.Add(1)
a.setData(data.(*Data))
return a
}
diff --git a/arrow/array/null.go b/arrow/array/null.go
index 76e56a4..38b3b09 100644
--- a/arrow/array/null.go
+++ b/arrow/array/null.go
@@ -21,7 +21,6 @@ import (
"fmt"
"reflect"
"strings"
- "sync/atomic"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/internal/debug"
@@ -37,7 +36,7 @@ type Null struct {
// NewNull returns a new Null array value of size n.
func NewNull(n int) *Null {
a := &Null{}
- a.refCount = 1
+ a.refCount.Add(1)
data := NewData(
arrow.Null, n,
[]*memory.Buffer{nil},
@@ -53,7 +52,7 @@ func NewNull(n int) *Null {
// NewNullData returns a new Null array value, from data.
func NewNullData(data arrow.ArrayData) *Null {
a := &Null{}
- a.refCount = 1
+ a.refCount.Add(1)
a.setData(data.(*Data))
return a
}
@@ -95,7 +94,9 @@ type NullBuilder struct {
// NewNullBuilder returns a builder, using the provided memory allocator.
func NewNullBuilder(mem memory.Allocator) *NullBuilder {
- return &NullBuilder{builder: builder{refCount: 1, mem: mem}}
+ nb := &NullBuilder{builder: builder{mem: mem}}
+ nb.refCount.Add(1)
+ return nb
}
func (b *NullBuilder) Type() arrow.DataType { return arrow.Null }
@@ -103,9 +104,9 @@ func (b *NullBuilder) Type() arrow.DataType { return
arrow.Null }
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
func (b *NullBuilder) Release() {
- debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+ debug.Assert(b.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.refCount.Add(-1) == 0 {
if b.nullBitmap != nil {
b.nullBitmap.Release()
b.nullBitmap = nil
diff --git a/arrow/array/numeric_generic.go b/arrow/array/numeric_generic.go
index 8c18edd..016dc37 100644
--- a/arrow/array/numeric_generic.go
+++ b/arrow/array/numeric_generic.go
@@ -34,7 +34,7 @@ type numericArray[T arrow.IntType | arrow.UintType |
arrow.FloatType] struct {
func newNumericData[T arrow.IntType | arrow.UintType | arrow.FloatType](data
arrow.ArrayData) numericArray[T] {
a := numericArray[T]{}
- a.refCount = 1
+ a.refCount.Add(1)
a.setData(data.(*Data))
return a
}
diff --git a/arrow/array/numericbuilder.gen.go
b/arrow/array/numericbuilder.gen.go
index 1618dba..be87fbf 100644
--- a/arrow/array/numericbuilder.gen.go
+++ b/arrow/array/numericbuilder.gen.go
@@ -24,7 +24,6 @@ import (
"reflect"
"strconv"
"strings"
- "sync/atomic"
"time"
"github.com/apache/arrow-go/v18/arrow"
@@ -42,7 +41,9 @@ type Int64Builder struct {
}
func NewInt64Builder(mem memory.Allocator) *Int64Builder {
- return &Int64Builder{builder: builder{refCount: 1, mem: mem}}
+ b := &Int64Builder{builder: builder{mem: mem}}
+ b.refCount.Add(1)
+ return b
}
func (b *Int64Builder) Type() arrow.DataType { return
arrow.PrimitiveTypes.Int64 }
@@ -50,9 +51,9 @@ func (b *Int64Builder) Type() arrow.DataType { return
arrow.PrimitiveTypes.Int64
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
func (b *Int64Builder) Release() {
- debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+ debug.Assert(b.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.refCount.Add(-1) == 0 {
if b.nullBitmap != nil {
b.nullBitmap.Release()
b.nullBitmap = nil
@@ -281,7 +282,9 @@ type Uint64Builder struct {
}
func NewUint64Builder(mem memory.Allocator) *Uint64Builder {
- return &Uint64Builder{builder: builder{refCount: 1, mem: mem}}
+ b := &Uint64Builder{builder: builder{mem: mem}}
+ b.refCount.Add(1)
+ return b
}
func (b *Uint64Builder) Type() arrow.DataType { return
arrow.PrimitiveTypes.Uint64 }
@@ -289,9 +292,9 @@ func (b *Uint64Builder) Type() arrow.DataType { return
arrow.PrimitiveTypes.Uint
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
func (b *Uint64Builder) Release() {
- debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+ debug.Assert(b.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.refCount.Add(-1) == 0 {
if b.nullBitmap != nil {
b.nullBitmap.Release()
b.nullBitmap = nil
@@ -520,7 +523,9 @@ type Float64Builder struct {
}
func NewFloat64Builder(mem memory.Allocator) *Float64Builder {
- return &Float64Builder{builder: builder{refCount: 1, mem: mem}}
+ b := &Float64Builder{builder: builder{mem: mem}}
+ b.refCount.Add(1)
+ return b
}
func (b *Float64Builder) Type() arrow.DataType { return
arrow.PrimitiveTypes.Float64 }
@@ -528,9 +533,9 @@ func (b *Float64Builder) Type() arrow.DataType { return
arrow.PrimitiveTypes.Flo
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
func (b *Float64Builder) Release() {
- debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+ debug.Assert(b.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.refCount.Add(-1) == 0 {
if b.nullBitmap != nil {
b.nullBitmap.Release()
b.nullBitmap = nil
@@ -759,7 +764,9 @@ type Int32Builder struct {
}
func NewInt32Builder(mem memory.Allocator) *Int32Builder {
- return &Int32Builder{builder: builder{refCount: 1, mem: mem}}
+ b := &Int32Builder{builder: builder{mem: mem}}
+ b.refCount.Add(1)
+ return b
}
func (b *Int32Builder) Type() arrow.DataType { return
arrow.PrimitiveTypes.Int32 }
@@ -767,9 +774,9 @@ func (b *Int32Builder) Type() arrow.DataType { return
arrow.PrimitiveTypes.Int32
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
func (b *Int32Builder) Release() {
- debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+ debug.Assert(b.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.refCount.Add(-1) == 0 {
if b.nullBitmap != nil {
b.nullBitmap.Release()
b.nullBitmap = nil
@@ -998,7 +1005,9 @@ type Uint32Builder struct {
}
func NewUint32Builder(mem memory.Allocator) *Uint32Builder {
- return &Uint32Builder{builder: builder{refCount: 1, mem: mem}}
+ b := &Uint32Builder{builder: builder{mem: mem}}
+ b.refCount.Add(1)
+ return b
}
func (b *Uint32Builder) Type() arrow.DataType { return
arrow.PrimitiveTypes.Uint32 }
@@ -1006,9 +1015,9 @@ func (b *Uint32Builder) Type() arrow.DataType { return
arrow.PrimitiveTypes.Uint
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
func (b *Uint32Builder) Release() {
- debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+ debug.Assert(b.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.refCount.Add(-1) == 0 {
if b.nullBitmap != nil {
b.nullBitmap.Release()
b.nullBitmap = nil
@@ -1237,7 +1246,9 @@ type Float32Builder struct {
}
func NewFloat32Builder(mem memory.Allocator) *Float32Builder {
- return &Float32Builder{builder: builder{refCount: 1, mem: mem}}
+ b := &Float32Builder{builder: builder{mem: mem}}
+ b.refCount.Add(1)
+ return b
}
func (b *Float32Builder) Type() arrow.DataType { return
arrow.PrimitiveTypes.Float32 }
@@ -1245,9 +1256,9 @@ func (b *Float32Builder) Type() arrow.DataType { return
arrow.PrimitiveTypes.Flo
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
func (b *Float32Builder) Release() {
- debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+ debug.Assert(b.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.refCount.Add(-1) == 0 {
if b.nullBitmap != nil {
b.nullBitmap.Release()
b.nullBitmap = nil
@@ -1476,7 +1487,9 @@ type Int16Builder struct {
}
func NewInt16Builder(mem memory.Allocator) *Int16Builder {
- return &Int16Builder{builder: builder{refCount: 1, mem: mem}}
+ b := &Int16Builder{builder: builder{mem: mem}}
+ b.refCount.Add(1)
+ return b
}
func (b *Int16Builder) Type() arrow.DataType { return
arrow.PrimitiveTypes.Int16 }
@@ -1484,9 +1497,9 @@ func (b *Int16Builder) Type() arrow.DataType { return
arrow.PrimitiveTypes.Int16
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
func (b *Int16Builder) Release() {
- debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+ debug.Assert(b.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.refCount.Add(-1) == 0 {
if b.nullBitmap != nil {
b.nullBitmap.Release()
b.nullBitmap = nil
@@ -1715,7 +1728,9 @@ type Uint16Builder struct {
}
func NewUint16Builder(mem memory.Allocator) *Uint16Builder {
- return &Uint16Builder{builder: builder{refCount: 1, mem: mem}}
+ b := &Uint16Builder{builder: builder{mem: mem}}
+ b.refCount.Add(1)
+ return b
}
func (b *Uint16Builder) Type() arrow.DataType { return
arrow.PrimitiveTypes.Uint16 }
@@ -1723,9 +1738,9 @@ func (b *Uint16Builder) Type() arrow.DataType { return
arrow.PrimitiveTypes.Uint
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
func (b *Uint16Builder) Release() {
- debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+ debug.Assert(b.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.refCount.Add(-1) == 0 {
if b.nullBitmap != nil {
b.nullBitmap.Release()
b.nullBitmap = nil
@@ -1954,7 +1969,9 @@ type Int8Builder struct {
}
func NewInt8Builder(mem memory.Allocator) *Int8Builder {
- return &Int8Builder{builder: builder{refCount: 1, mem: mem}}
+ b := &Int8Builder{builder: builder{mem: mem}}
+ b.refCount.Add(1)
+ return b
}
func (b *Int8Builder) Type() arrow.DataType { return arrow.PrimitiveTypes.Int8
}
@@ -1962,9 +1979,9 @@ func (b *Int8Builder) Type() arrow.DataType { return
arrow.PrimitiveTypes.Int8 }
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
func (b *Int8Builder) Release() {
- debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+ debug.Assert(b.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.refCount.Add(-1) == 0 {
if b.nullBitmap != nil {
b.nullBitmap.Release()
b.nullBitmap = nil
@@ -2193,7 +2210,9 @@ type Uint8Builder struct {
}
func NewUint8Builder(mem memory.Allocator) *Uint8Builder {
- return &Uint8Builder{builder: builder{refCount: 1, mem: mem}}
+ b := &Uint8Builder{builder: builder{mem: mem}}
+ b.refCount.Add(1)
+ return b
}
func (b *Uint8Builder) Type() arrow.DataType { return
arrow.PrimitiveTypes.Uint8 }
@@ -2201,9 +2220,9 @@ func (b *Uint8Builder) Type() arrow.DataType { return
arrow.PrimitiveTypes.Uint8
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
func (b *Uint8Builder) Release() {
- debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+ debug.Assert(b.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.refCount.Add(-1) == 0 {
if b.nullBitmap != nil {
b.nullBitmap.Release()
b.nullBitmap = nil
@@ -2433,7 +2452,9 @@ type Time32Builder struct {
}
func NewTime32Builder(mem memory.Allocator, dtype *arrow.Time32Type)
*Time32Builder {
- return &Time32Builder{builder: builder{refCount: 1, mem: mem}, dtype:
dtype}
+ b := &Time32Builder{builder: builder{mem: mem}, dtype: dtype}
+ b.refCount.Add(1)
+ return b
}
func (b *Time32Builder) Type() arrow.DataType { return b.dtype }
@@ -2441,9 +2462,9 @@ func (b *Time32Builder) Type() arrow.DataType { return
b.dtype }
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
func (b *Time32Builder) Release() {
- debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+ debug.Assert(b.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.refCount.Add(-1) == 0 {
if b.nullBitmap != nil {
b.nullBitmap.Release()
b.nullBitmap = nil
@@ -2673,7 +2694,9 @@ type Time64Builder struct {
}
func NewTime64Builder(mem memory.Allocator, dtype *arrow.Time64Type)
*Time64Builder {
- return &Time64Builder{builder: builder{refCount: 1, mem: mem}, dtype:
dtype}
+ b := &Time64Builder{builder: builder{mem: mem}, dtype: dtype}
+ b.refCount.Add(1)
+ return b
}
func (b *Time64Builder) Type() arrow.DataType { return b.dtype }
@@ -2681,9 +2704,9 @@ func (b *Time64Builder) Type() arrow.DataType { return
b.dtype }
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
func (b *Time64Builder) Release() {
- debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+ debug.Assert(b.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.refCount.Add(-1) == 0 {
if b.nullBitmap != nil {
b.nullBitmap.Release()
b.nullBitmap = nil
@@ -2912,7 +2935,9 @@ type Date32Builder struct {
}
func NewDate32Builder(mem memory.Allocator) *Date32Builder {
- return &Date32Builder{builder: builder{refCount: 1, mem: mem}}
+ b := &Date32Builder{builder: builder{mem: mem}}
+ b.refCount.Add(1)
+ return b
}
func (b *Date32Builder) Type() arrow.DataType { return
arrow.PrimitiveTypes.Date32 }
@@ -2920,9 +2945,9 @@ func (b *Date32Builder) Type() arrow.DataType { return
arrow.PrimitiveTypes.Date
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
func (b *Date32Builder) Release() {
- debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+ debug.Assert(b.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.refCount.Add(-1) == 0 {
if b.nullBitmap != nil {
b.nullBitmap.Release()
b.nullBitmap = nil
@@ -3151,7 +3176,9 @@ type Date64Builder struct {
}
func NewDate64Builder(mem memory.Allocator) *Date64Builder {
- return &Date64Builder{builder: builder{refCount: 1, mem: mem}}
+ b := &Date64Builder{builder: builder{mem: mem}}
+ b.refCount.Add(1)
+ return b
}
func (b *Date64Builder) Type() arrow.DataType { return
arrow.PrimitiveTypes.Date64 }
@@ -3159,9 +3186,9 @@ func (b *Date64Builder) Type() arrow.DataType { return
arrow.PrimitiveTypes.Date
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
func (b *Date64Builder) Release() {
- debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+ debug.Assert(b.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.refCount.Add(-1) == 0 {
if b.nullBitmap != nil {
b.nullBitmap.Release()
b.nullBitmap = nil
@@ -3391,7 +3418,9 @@ type DurationBuilder struct {
}
func NewDurationBuilder(mem memory.Allocator, dtype *arrow.DurationType)
*DurationBuilder {
- return &DurationBuilder{builder: builder{refCount: 1, mem: mem}, dtype:
dtype}
+ b := &DurationBuilder{builder: builder{mem: mem}, dtype: dtype}
+ b.refCount.Add(1)
+ return b
}
func (b *DurationBuilder) Type() arrow.DataType { return b.dtype }
@@ -3399,9 +3428,9 @@ func (b *DurationBuilder) Type() arrow.DataType { return
b.dtype }
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
func (b *DurationBuilder) Release() {
- debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+ debug.Assert(b.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.refCount.Add(-1) == 0 {
if b.nullBitmap != nil {
b.nullBitmap.Release()
b.nullBitmap = nil
diff --git a/arrow/array/numericbuilder.gen.go.tmpl
b/arrow/array/numericbuilder.gen.go.tmpl
index e84e095..518b3d4 100644
--- a/arrow/array/numericbuilder.gen.go.tmpl
+++ b/arrow/array/numericbuilder.gen.go.tmpl
@@ -38,14 +38,18 @@ type {{.Name}}Builder struct {
{{if .Opt.Parametric}}
func New{{.Name}}Builder(mem memory.Allocator, dtype *arrow.{{.Name}}Type)
*{{.Name}}Builder {
- return &{{.Name}}Builder{builder: builder{refCount:1, mem: mem}, dtype:
dtype}
+ b := &{{.Name}}Builder{builder: builder{mem: mem}, dtype: dtype}
+ b.refCount.Add(1)
+ return b
}
func (b *{{.Name}}Builder) Type() arrow.DataType { return b.dtype }
{{else}}
func New{{.Name}}Builder(mem memory.Allocator) *{{.Name}}Builder {
- return &{{.Name}}Builder{builder: builder{refCount:1, mem: mem}}
+ b := &{{.Name}}Builder{builder: builder{mem: mem}}
+ b.refCount.Add(1)
+ return b
}
func (b *{{.Name}}Builder) Type() arrow.DataType { return
arrow.PrimitiveTypes.{{.Name}} }
@@ -54,9 +58,9 @@ func (b *{{.Name}}Builder) Type() arrow.DataType { return
arrow.PrimitiveTypes.{
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
func (b *{{.Name}}Builder) Release() {
- debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+ debug.Assert(b.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.refCount.Add(-1) == 0 {
if b.nullBitmap != nil {
b.nullBitmap.Release()
b.nullBitmap = nil
diff --git a/arrow/array/numericbuilder.gen_test.go
b/arrow/array/numericbuilder.gen_test.go
index 677a5dd..1336815 100644
--- a/arrow/array/numericbuilder.gen_test.go
+++ b/arrow/array/numericbuilder.gen_test.go
@@ -230,6 +230,30 @@ func TestInt64Builder_Resize(t *testing.T) {
assert.Equal(t, 5, ab.Len())
}
+func TestInt64BuilderUnmarshalJSON(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
+ defer mem.AssertSize(t, 0)
+
+ bldr := array.NewInt64Builder(mem)
+ defer bldr.Release()
+
+ jsonstr := `[0, 1, null, 2.3, -11]`
+
+ err := bldr.UnmarshalJSON([]byte(jsonstr))
+ assert.NoError(t, err)
+
+ arr := bldr.NewInt64Array()
+ defer arr.Release()
+
+ assert.NotNil(t, arr)
+
+ assert.Equal(t, int64(0), int64(arr.Value(0)))
+ assert.Equal(t, int64(1), int64(arr.Value(1)))
+ assert.True(t, arr.IsNull(2))
+ assert.Equal(t, int64(2), int64(arr.Value(3)))
+ assert.Equal(t, int64(5), int64(arr.Len()))
+}
+
func TestUint64StringRoundTrip(t *testing.T) {
// 1. create array
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
@@ -432,6 +456,30 @@ func TestUint64Builder_Resize(t *testing.T) {
assert.Equal(t, 5, ab.Len())
}
+func TestUint64BuilderUnmarshalJSON(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
+ defer mem.AssertSize(t, 0)
+
+ bldr := array.NewUint64Builder(mem)
+ defer bldr.Release()
+
+ jsonstr := `[0, 1, null, 2.3, -11]`
+
+ err := bldr.UnmarshalJSON([]byte(jsonstr))
+ assert.NoError(t, err)
+
+ arr := bldr.NewUint64Array()
+ defer arr.Release()
+
+ assert.NotNil(t, arr)
+
+ assert.Equal(t, int64(0), int64(arr.Value(0)))
+ assert.Equal(t, int64(1), int64(arr.Value(1)))
+ assert.True(t, arr.IsNull(2))
+ assert.Equal(t, int64(2), int64(arr.Value(3)))
+ assert.Equal(t, int64(5), int64(arr.Len()))
+}
+
func TestFloat64StringRoundTrip(t *testing.T) {
// 1. create array
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
@@ -858,6 +906,30 @@ func TestInt32Builder_Resize(t *testing.T) {
assert.Equal(t, 5, ab.Len())
}
+func TestInt32BuilderUnmarshalJSON(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
+ defer mem.AssertSize(t, 0)
+
+ bldr := array.NewInt32Builder(mem)
+ defer bldr.Release()
+
+ jsonstr := `[0, 1, null, 2.3, -11]`
+
+ err := bldr.UnmarshalJSON([]byte(jsonstr))
+ assert.NoError(t, err)
+
+ arr := bldr.NewInt32Array()
+ defer arr.Release()
+
+ assert.NotNil(t, arr)
+
+ assert.Equal(t, int64(0), int64(arr.Value(0)))
+ assert.Equal(t, int64(1), int64(arr.Value(1)))
+ assert.True(t, arr.IsNull(2))
+ assert.Equal(t, int64(2), int64(arr.Value(3)))
+ assert.Equal(t, int64(5), int64(arr.Len()))
+}
+
func TestUint32StringRoundTrip(t *testing.T) {
// 1. create array
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
@@ -1060,6 +1132,30 @@ func TestUint32Builder_Resize(t *testing.T) {
assert.Equal(t, 5, ab.Len())
}
+func TestUint32BuilderUnmarshalJSON(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
+ defer mem.AssertSize(t, 0)
+
+ bldr := array.NewUint32Builder(mem)
+ defer bldr.Release()
+
+ jsonstr := `[0, 1, null, 2.3, -11]`
+
+ err := bldr.UnmarshalJSON([]byte(jsonstr))
+ assert.NoError(t, err)
+
+ arr := bldr.NewUint32Array()
+ defer arr.Release()
+
+ assert.NotNil(t, arr)
+
+ assert.Equal(t, int64(0), int64(arr.Value(0)))
+ assert.Equal(t, int64(1), int64(arr.Value(1)))
+ assert.True(t, arr.IsNull(2))
+ assert.Equal(t, int64(2), int64(arr.Value(3)))
+ assert.Equal(t, int64(5), int64(arr.Len()))
+}
+
func TestFloat32StringRoundTrip(t *testing.T) {
// 1. create array
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
@@ -1486,6 +1582,30 @@ func TestInt16Builder_Resize(t *testing.T) {
assert.Equal(t, 5, ab.Len())
}
+func TestInt16BuilderUnmarshalJSON(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
+ defer mem.AssertSize(t, 0)
+
+ bldr := array.NewInt16Builder(mem)
+ defer bldr.Release()
+
+ jsonstr := `[0, 1, null, 2.3, -11]`
+
+ err := bldr.UnmarshalJSON([]byte(jsonstr))
+ assert.NoError(t, err)
+
+ arr := bldr.NewInt16Array()
+ defer arr.Release()
+
+ assert.NotNil(t, arr)
+
+ assert.Equal(t, int64(0), int64(arr.Value(0)))
+ assert.Equal(t, int64(1), int64(arr.Value(1)))
+ assert.True(t, arr.IsNull(2))
+ assert.Equal(t, int64(2), int64(arr.Value(3)))
+ assert.Equal(t, int64(5), int64(arr.Len()))
+}
+
func TestUint16StringRoundTrip(t *testing.T) {
// 1. create array
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
@@ -1688,6 +1808,30 @@ func TestUint16Builder_Resize(t *testing.T) {
assert.Equal(t, 5, ab.Len())
}
+func TestUint16BuilderUnmarshalJSON(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
+ defer mem.AssertSize(t, 0)
+
+ bldr := array.NewUint16Builder(mem)
+ defer bldr.Release()
+
+ jsonstr := `[0, 1, null, 2.3, -11]`
+
+ err := bldr.UnmarshalJSON([]byte(jsonstr))
+ assert.NoError(t, err)
+
+ arr := bldr.NewUint16Array()
+ defer arr.Release()
+
+ assert.NotNil(t, arr)
+
+ assert.Equal(t, int64(0), int64(arr.Value(0)))
+ assert.Equal(t, int64(1), int64(arr.Value(1)))
+ assert.True(t, arr.IsNull(2))
+ assert.Equal(t, int64(2), int64(arr.Value(3)))
+ assert.Equal(t, int64(5), int64(arr.Len()))
+}
+
func TestInt8StringRoundTrip(t *testing.T) {
// 1. create array
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
@@ -1890,6 +2034,30 @@ func TestInt8Builder_Resize(t *testing.T) {
assert.Equal(t, 5, ab.Len())
}
+func TestInt8BuilderUnmarshalJSON(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
+ defer mem.AssertSize(t, 0)
+
+ bldr := array.NewInt8Builder(mem)
+ defer bldr.Release()
+
+ jsonstr := `[0, 1, null, 2.3, -11]`
+
+ err := bldr.UnmarshalJSON([]byte(jsonstr))
+ assert.NoError(t, err)
+
+ arr := bldr.NewInt8Array()
+ defer arr.Release()
+
+ assert.NotNil(t, arr)
+
+ assert.Equal(t, int64(0), int64(arr.Value(0)))
+ assert.Equal(t, int64(1), int64(arr.Value(1)))
+ assert.True(t, arr.IsNull(2))
+ assert.Equal(t, int64(2), int64(arr.Value(3)))
+ assert.Equal(t, int64(5), int64(arr.Len()))
+}
+
func TestUint8StringRoundTrip(t *testing.T) {
// 1. create array
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
@@ -2092,6 +2260,30 @@ func TestUint8Builder_Resize(t *testing.T) {
assert.Equal(t, 5, ab.Len())
}
+func TestUint8BuilderUnmarshalJSON(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
+ defer mem.AssertSize(t, 0)
+
+ bldr := array.NewUint8Builder(mem)
+ defer bldr.Release()
+
+ jsonstr := `[0, 1, null, 2.3, -11]`
+
+ err := bldr.UnmarshalJSON([]byte(jsonstr))
+ assert.NoError(t, err)
+
+ arr := bldr.NewUint8Array()
+ defer arr.Release()
+
+ assert.NotNil(t, arr)
+
+ assert.Equal(t, int64(0), int64(arr.Value(0)))
+ assert.Equal(t, int64(1), int64(arr.Value(1)))
+ assert.True(t, arr.IsNull(2))
+ assert.Equal(t, int64(2), int64(arr.Value(3)))
+ assert.Equal(t, int64(5), int64(arr.Len()))
+}
+
func TestTime32StringRoundTrip(t *testing.T) {
// 1. create array
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
@@ -2299,6 +2491,31 @@ func TestTime32Builder_Resize(t *testing.T) {
assert.Equal(t, 5, ab.Len())
}
+func TestTime32BuilderUnmarshalJSON(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
+ defer mem.AssertSize(t, 0)
+
+ dtype := &arrow.Time32Type{Unit: arrow.Second}
+ bldr := array.NewTime32Builder(mem, dtype)
+ defer bldr.Release()
+
+ jsonstr := `[0, 1, null, 2.3, -11]`
+
+ err := bldr.UnmarshalJSON([]byte(jsonstr))
+ assert.NoError(t, err)
+
+ arr := bldr.NewTime32Array()
+ defer arr.Release()
+
+ assert.NotNil(t, arr)
+
+ assert.Equal(t, int64(0), int64(arr.Value(0)))
+ assert.Equal(t, int64(1), int64(arr.Value(1)))
+ assert.True(t, arr.IsNull(2))
+ assert.Equal(t, int64(2), int64(arr.Value(3)))
+ assert.Equal(t, int64(5), int64(arr.Len()))
+}
+
func TestTime64StringRoundTrip(t *testing.T) {
// 1. create array
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
@@ -2506,6 +2723,31 @@ func TestTime64Builder_Resize(t *testing.T) {
assert.Equal(t, 5, ab.Len())
}
+func TestTime64BuilderUnmarshalJSON(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
+ defer mem.AssertSize(t, 0)
+
+ dtype := &arrow.Time64Type{Unit: arrow.Second}
+ bldr := array.NewTime64Builder(mem, dtype)
+ defer bldr.Release()
+
+ jsonstr := `[0, 1, null, 2.3, -11]`
+
+ err := bldr.UnmarshalJSON([]byte(jsonstr))
+ assert.NoError(t, err)
+
+ arr := bldr.NewTime64Array()
+ defer arr.Release()
+
+ assert.NotNil(t, arr)
+
+ assert.Equal(t, int64(0), int64(arr.Value(0)))
+ assert.Equal(t, int64(1), int64(arr.Value(1)))
+ assert.True(t, arr.IsNull(2))
+ assert.Equal(t, int64(2), int64(arr.Value(3)))
+ assert.Equal(t, int64(5), int64(arr.Len()))
+}
+
func TestDate32StringRoundTrip(t *testing.T) {
// 1. create array
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
@@ -2708,6 +2950,30 @@ func TestDate32Builder_Resize(t *testing.T) {
assert.Equal(t, 5, ab.Len())
}
+func TestDate32BuilderUnmarshalJSON(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
+ defer mem.AssertSize(t, 0)
+
+ bldr := array.NewDate32Builder(mem)
+ defer bldr.Release()
+
+ jsonstr := `[0, 1, null, 2.3, -11]`
+
+ err := bldr.UnmarshalJSON([]byte(jsonstr))
+ assert.NoError(t, err)
+
+ arr := bldr.NewDate32Array()
+ defer arr.Release()
+
+ assert.NotNil(t, arr)
+
+ assert.Equal(t, int64(0), int64(arr.Value(0)))
+ assert.Equal(t, int64(1), int64(arr.Value(1)))
+ assert.True(t, arr.IsNull(2))
+ assert.Equal(t, int64(2), int64(arr.Value(3)))
+ assert.Equal(t, int64(5), int64(arr.Len()))
+}
+
func TestDate64StringRoundTrip(t *testing.T) {
// 1. create array
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
@@ -2917,6 +3183,30 @@ func TestDate64Builder_Resize(t *testing.T) {
assert.Equal(t, 5, ab.Len())
}
+func TestDate64BuilderUnmarshalJSON(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
+ defer mem.AssertSize(t, 0)
+
+ bldr := array.NewDate64Builder(mem)
+ defer bldr.Release()
+
+ jsonstr := `[0, 1, null, 2.3, -11]`
+
+ err := bldr.UnmarshalJSON([]byte(jsonstr))
+ assert.NoError(t, err)
+
+ arr := bldr.NewDate64Array()
+ defer arr.Release()
+
+ assert.NotNil(t, arr)
+
+ assert.Equal(t, int64(0), int64(arr.Value(0)))
+ assert.Equal(t, int64(1), int64(arr.Value(1)))
+ assert.True(t, arr.IsNull(2))
+ assert.Equal(t, int64(2), int64(arr.Value(3)))
+ assert.Equal(t, int64(5), int64(arr.Len()))
+}
+
func TestDurationStringRoundTrip(t *testing.T) {
// 1. create array
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
@@ -3123,3 +3413,28 @@ func TestDurationBuilder_Resize(t *testing.T) {
ab.Resize(32)
assert.Equal(t, 5, ab.Len())
}
+
+func TestDurationBuilderUnmarshalJSON(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
+ defer mem.AssertSize(t, 0)
+
+ dtype := &arrow.DurationType{Unit: arrow.Second}
+ bldr := array.NewDurationBuilder(mem, dtype)
+ defer bldr.Release()
+
+ jsonstr := `[0, 1, null, 2.3, -11]`
+
+ err := bldr.UnmarshalJSON([]byte(jsonstr))
+ assert.NoError(t, err)
+
+ arr := bldr.NewDurationArray()
+ defer arr.Release()
+
+ assert.NotNil(t, arr)
+
+ assert.Equal(t, int64(0), int64(arr.Value(0)))
+ assert.Equal(t, int64(1), int64(arr.Value(1)))
+ assert.True(t, arr.IsNull(2))
+ assert.Equal(t, int64(2), int64(arr.Value(3)))
+ assert.Equal(t, int64(5), int64(arr.Len()))
+}
diff --git a/arrow/array/numericbuilder.gen_test.go.tmpl
b/arrow/array/numericbuilder.gen_test.go.tmpl
index a5d58f4..86cc74a 100644
--- a/arrow/array/numericbuilder.gen_test.go.tmpl
+++ b/arrow/array/numericbuilder.gen_test.go.tmpl
@@ -276,9 +276,17 @@ func Test{{.Name}}BuilderUnmarshalJSON(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer mem.AssertSize(t, 0)
+{{if .Opt.Parametric -}}
+ dtype := &arrow.{{.Name}}Type{Unit: arrow.Second}
+ bldr := array.New{{.Name}}Builder(mem, dtype)
+{{else}}
bldr := array.New{{.Name}}Builder(mem)
+{{end -}}
+
defer bldr.Release()
+
+{{ if or (eq .Name "Float64") (eq .Name "Float32") -}}
jsonstr := `[0, 1, "+Inf", 2, 3, "NaN", "NaN", 4, 5, "-Inf"]`
err := bldr.UnmarshalJSON([]byte(jsonstr))
@@ -292,6 +300,23 @@ func Test{{.Name}}BuilderUnmarshalJSON(t *testing.T) {
assert.False(t, math.IsInf(float64(arr.Value(0)), 0), arr.Value(0))
assert.True(t, math.IsInf(float64(arr.Value(2)), 1), arr.Value(2))
assert.True(t, math.IsNaN(float64(arr.Value(5))), arr.Value(5))
+{{else}}
+ jsonstr := `[0, 1, null, 2.3, -11]`
+
+ err := bldr.UnmarshalJSON([]byte(jsonstr))
+ assert.NoError(t, err)
+
+ arr := bldr.New{{.Name}}Array()
+ defer arr.Release()
+
+ assert.NotNil(t, arr)
+
+ assert.Equal(t, int64(0), int64(arr.Value(0)))
+ assert.Equal(t, int64(1), int64(arr.Value(1)))
+ assert.True(t, arr.IsNull(2))
+ assert.Equal(t, int64(2), int64(arr.Value(3)))
+ assert.Equal(t, int64(5), int64(arr.Len()))
+{{end -}}
}
{{end}}
diff --git a/arrow/array/record.go b/arrow/array/record.go
index f29c896..18a50ed 100644
--- a/arrow/array/record.go
+++ b/arrow/array/record.go
@@ -43,7 +43,7 @@ type RecordReader interface {
// simpleRecords is a simple iterator over a collection of records.
type simpleRecords struct {
- refCount int64
+ refCount atomic.Int64
schema *arrow.Schema
recs []arrow.Record
@@ -53,11 +53,11 @@ type simpleRecords struct {
// NewRecordReader returns a simple iterator over the given slice of records.
func NewRecordReader(schema *arrow.Schema, recs []arrow.Record) (RecordReader,
error) {
rs := &simpleRecords{
- refCount: 1,
- schema: schema,
- recs: recs,
- cur: nil,
+ schema: schema,
+ recs: recs,
+ cur: nil,
}
+ rs.refCount.Add(1)
for _, rec := range rs.recs {
rec.Retain()
@@ -76,16 +76,16 @@ func NewRecordReader(schema *arrow.Schema, recs
[]arrow.Record) (RecordReader, e
// Retain increases the reference count by 1.
// Retain may be called simultaneously from multiple goroutines.
func (rs *simpleRecords) Retain() {
- atomic.AddInt64(&rs.refCount, 1)
+ rs.refCount.Add(1)
}
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
// Release may be called simultaneously from multiple goroutines.
func (rs *simpleRecords) Release() {
- debug.Assert(atomic.LoadInt64(&rs.refCount) > 0, "too many releases")
+ debug.Assert(rs.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&rs.refCount, -1) == 0 {
+ if rs.refCount.Add(-1) == 0 {
if rs.cur != nil {
rs.cur.Release()
}
@@ -113,7 +113,7 @@ func (rs *simpleRecords) Err() error { return nil }
// simpleRecord is a basic, non-lazy in-memory record batch.
type simpleRecord struct {
- refCount int64
+ refCount atomic.Int64
schema *arrow.Schema
@@ -127,11 +127,12 @@ type simpleRecord struct {
// NewRecord panics if rows is larger than the height of the columns.
func NewRecord(schema *arrow.Schema, cols []arrow.Array, nrows int64)
arrow.Record {
rec := &simpleRecord{
- refCount: 1,
- schema: schema,
- rows: nrows,
- arrs: make([]arrow.Array, len(cols)),
+ schema: schema,
+ rows: nrows,
+ arrs: make([]arrow.Array, len(cols)),
}
+ rec.refCount.Add(1)
+
copy(rec.arrs, cols)
for _, arr := range rec.arrs {
arr.Retain()
@@ -211,16 +212,16 @@ func (rec *simpleRecord) validate() error {
// Retain increases the reference count by 1.
// Retain may be called simultaneously from multiple goroutines.
func (rec *simpleRecord) Retain() {
- atomic.AddInt64(&rec.refCount, 1)
+ rec.refCount.Add(1)
}
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
// Release may be called simultaneously from multiple goroutines.
func (rec *simpleRecord) Release() {
- debug.Assert(atomic.LoadInt64(&rec.refCount) > 0, "too many releases")
+ debug.Assert(rec.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&rec.refCount, -1) == 0 {
+ if rec.refCount.Add(-1) == 0 {
for _, arr := range rec.arrs {
arr.Release()
}
@@ -274,7 +275,7 @@ func (rec *simpleRecord) MarshalJSON() ([]byte, error) {
// RecordBuilder eases the process of building a Record, iteratively, from
// a known Schema.
type RecordBuilder struct {
- refCount int64
+ refCount atomic.Int64
mem memory.Allocator
schema *arrow.Schema
fields []Builder
@@ -283,11 +284,11 @@ type RecordBuilder struct {
// NewRecordBuilder returns a builder, using the provided memory allocator and
a schema.
func NewRecordBuilder(mem memory.Allocator, schema *arrow.Schema)
*RecordBuilder {
b := &RecordBuilder{
- refCount: 1,
- mem: mem,
- schema: schema,
- fields: make([]Builder, schema.NumFields()),
+ mem: mem,
+ schema: schema,
+ fields: make([]Builder, schema.NumFields()),
}
+ b.refCount.Add(1)
for i := 0; i < schema.NumFields(); i++ {
b.fields[i] = NewBuilder(b.mem, schema.Field(i).Type)
@@ -299,14 +300,14 @@ func NewRecordBuilder(mem memory.Allocator, schema
*arrow.Schema) *RecordBuilder
// Retain increases the reference count by 1.
// Retain may be called simultaneously from multiple goroutines.
func (b *RecordBuilder) Retain() {
- atomic.AddInt64(&b.refCount, 1)
+ b.refCount.Add(1)
}
// Release decreases the reference count by 1.
func (b *RecordBuilder) Release() {
- debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+ debug.Assert(b.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.refCount.Add(-1) == 0 {
for _, f := range b.fields {
f.Release()
}
diff --git a/arrow/array/string.go b/arrow/array/string.go
index 6ca6f4e..d42492d 100644
--- a/arrow/array/string.go
+++ b/arrow/array/string.go
@@ -44,7 +44,7 @@ type String struct {
// NewStringData constructs a new String array from data.
func NewStringData(data arrow.ArrayData) *String {
a := &String{}
- a.refCount = 1
+ a.refCount.Add(1)
a.setData(data.(*Data))
return a
}
@@ -191,7 +191,7 @@ type LargeString struct {
// NewStringData constructs a new String array from data.
func NewLargeStringData(data arrow.ArrayData) *LargeString {
a := &LargeString{}
- a.refCount = 1
+ a.refCount.Add(1)
a.setData(data.(*Data))
return a
}
@@ -332,7 +332,7 @@ type StringView struct {
func NewStringViewData(data arrow.ArrayData) *StringView {
a := &StringView{}
- a.refCount = 1
+ a.refCount.Add(1)
a.setData(data.(*Data))
return a
}
diff --git a/arrow/array/struct.go b/arrow/array/struct.go
index 7f65f8d..564ae09 100644
--- a/arrow/array/struct.go
+++ b/arrow/array/struct.go
@@ -21,7 +21,6 @@ import (
"errors"
"fmt"
"strings"
- "sync/atomic"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/bitutil"
@@ -107,7 +106,7 @@ func NewStructArrayWithNulls(cols []arrow.Array, names
[]string, nullBitmap *mem
// NewStructData returns a new Struct array value from data.
func NewStructData(data arrow.ArrayData) *Struct {
a := &Struct{}
- a.refCount = 1
+ a.refCount.Add(1)
a.setData(data.(*Data))
return a
}
@@ -256,10 +255,12 @@ type StructBuilder struct {
// NewStructBuilder returns a builder, using the provided memory allocator.
func NewStructBuilder(mem memory.Allocator, dtype *arrow.StructType)
*StructBuilder {
b := &StructBuilder{
- builder: builder{refCount: 1, mem: mem},
+ builder: builder{mem: mem},
dtype: dtype,
fields: make([]Builder, dtype.NumFields()),
}
+ b.refCount.Add(1)
+
for i, f := range dtype.Fields() {
b.fields[i] = NewBuilder(b.mem, f.Type)
}
@@ -278,9 +279,9 @@ func (b *StructBuilder) Type() arrow.DataType {
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
func (b *StructBuilder) Release() {
- debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+ debug.Assert(b.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.refCount.Add(-1) == 0 {
if b.nullBitmap != nil {
b.nullBitmap.Release()
b.nullBitmap = nil
diff --git a/arrow/array/table.go b/arrow/array/table.go
index 95ac67f..367b1b1 100644
--- a/arrow/array/table.go
+++ b/arrow/array/table.go
@@ -85,7 +85,7 @@ func NewChunkedSlice(a *arrow.Chunked, i, j int64)
*arrow.Chunked {
// simpleTable is a basic, non-lazy in-memory table.
type simpleTable struct {
- refCount int64
+ refCount atomic.Int64
rows int64
cols []arrow.Column
@@ -101,11 +101,11 @@ type simpleTable struct {
// NewTable panics if rows is larger than the height of the columns.
func NewTable(schema *arrow.Schema, cols []arrow.Column, rows int64)
arrow.Table {
tbl := simpleTable{
- refCount: 1,
- rows: rows,
- cols: cols,
- schema: schema,
+ rows: rows,
+ cols: cols,
+ schema: schema,
}
+ tbl.refCount.Add(1)
if tbl.rows < 0 {
switch len(tbl.cols) {
@@ -150,11 +150,11 @@ func NewTableFromSlice(schema *arrow.Schema, data
[][]arrow.Array) arrow.Table {
}
tbl := simpleTable{
- refCount: 1,
- schema: schema,
- cols: cols,
- rows: int64(cols[0].Len()),
+ schema: schema,
+ cols: cols,
+ rows: int64(cols[0].Len()),
}
+ tbl.refCount.Add(1)
defer func() {
if r := recover(); r != nil {
@@ -241,16 +241,16 @@ func (tbl *simpleTable) validate() {
// Retain increases the reference count by 1.
// Retain may be called simultaneously from multiple goroutines.
func (tbl *simpleTable) Retain() {
- atomic.AddInt64(&tbl.refCount, 1)
+ tbl.refCount.Add(1)
}
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
// Release may be called simultaneously from multiple goroutines.
func (tbl *simpleTable) Release() {
- debug.Assert(atomic.LoadInt64(&tbl.refCount) > 0, "too many releases")
+ debug.Assert(tbl.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&tbl.refCount, -1) == 0 {
+ if tbl.refCount.Add(-1) == 0 {
for i := range tbl.cols {
tbl.cols[i].Release()
}
@@ -279,7 +279,7 @@ func (tbl *simpleTable) String() string {
// TableReader is a Record iterator over a (possibly chunked) Table
type TableReader struct {
- refCount int64
+ refCount atomic.Int64
tbl arrow.Table
cur int64 // current row
@@ -297,15 +297,15 @@ type TableReader struct {
func NewTableReader(tbl arrow.Table, chunkSize int64) *TableReader {
ncols := tbl.NumCols()
tr := &TableReader{
- refCount: 1,
- tbl: tbl,
- cur: 0,
- max: int64(tbl.NumRows()),
- chksz: chunkSize,
- chunks: make([]*arrow.Chunked, ncols),
- slots: make([]int, ncols),
- offsets: make([]int64, ncols),
+ tbl: tbl,
+ cur: 0,
+ max: int64(tbl.NumRows()),
+ chksz: chunkSize,
+ chunks: make([]*arrow.Chunked, ncols),
+ slots: make([]int, ncols),
+ offsets: make([]int64, ncols),
}
+ tr.refCount.Add(1)
tr.tbl.Retain()
if tr.chksz <= 0 {
@@ -383,16 +383,16 @@ func (tr *TableReader) Next() bool {
// Retain increases the reference count by 1.
// Retain may be called simultaneously from multiple goroutines.
func (tr *TableReader) Retain() {
- atomic.AddInt64(&tr.refCount, 1)
+ tr.refCount.Add(1)
}
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
// Release may be called simultaneously from multiple goroutines.
func (tr *TableReader) Release() {
- debug.Assert(atomic.LoadInt64(&tr.refCount) > 0, "too many releases")
+ debug.Assert(tr.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&tr.refCount, -1) == 0 {
+ if tr.refCount.Add(-1) == 0 {
tr.tbl.Release()
for _, chk := range tr.chunks {
chk.Release()
diff --git a/arrow/array/timestamp.go b/arrow/array/timestamp.go
index 74b1271..164caac 100644
--- a/arrow/array/timestamp.go
+++ b/arrow/array/timestamp.go
@@ -21,7 +21,6 @@ import (
"fmt"
"reflect"
"strings"
- "sync/atomic"
"time"
"github.com/apache/arrow-go/v18/arrow"
@@ -40,7 +39,7 @@ type Timestamp struct {
// NewTimestampData creates a new Timestamp from Data.
func NewTimestampData(data arrow.ArrayData) *Timestamp {
a := &Timestamp{}
- a.refCount = 1
+ a.refCount.Add(1)
a.setData(data.(*Data))
return a
}
@@ -132,7 +131,9 @@ type TimestampBuilder struct {
}
func NewTimestampBuilder(mem memory.Allocator, dtype *arrow.TimestampType)
*TimestampBuilder {
- return &TimestampBuilder{builder: builder{refCount: 1, mem: mem},
dtype: dtype}
+ tb := &TimestampBuilder{builder: builder{mem: mem}, dtype: dtype}
+ tb.refCount.Add(1)
+ return tb
}
func (b *TimestampBuilder) Type() arrow.DataType { return b.dtype }
@@ -140,9 +141,9 @@ func (b *TimestampBuilder) Type() arrow.DataType { return
b.dtype }
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
func (b *TimestampBuilder) Release() {
- debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+ debug.Assert(b.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.refCount.Add(-1) == 0 {
if b.nullBitmap != nil {
b.nullBitmap.Release()
b.nullBitmap = nil
diff --git a/arrow/array/union.go b/arrow/array/union.go
index 6f3a9a6..edb3625 100644
--- a/arrow/array/union.go
+++ b/arrow/array/union.go
@@ -23,7 +23,6 @@ import (
"math"
"reflect"
"strings"
- "sync/atomic"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/bitutil"
@@ -246,7 +245,7 @@ func NewSparseUnion(dt *arrow.SparseUnionType, length int,
children []arrow.Arra
// NewSparseUnionData constructs a SparseUnion array from the given ArrayData
object.
func NewSparseUnionData(data arrow.ArrayData) *SparseUnion {
a := &SparseUnion{}
- a.refCount = 1
+ a.refCount.Add(1)
a.setData(data.(*Data))
return a
}
@@ -506,7 +505,7 @@ func NewDenseUnion(dt *arrow.DenseUnionType, length int,
children []arrow.Array,
// NewDenseUnionData constructs a DenseUnion array from the given ArrayData
object.
func NewDenseUnionData(data arrow.ArrayData) *DenseUnion {
a := &DenseUnion{}
- a.refCount = 1
+ a.refCount.Add(1)
a.setData(data.(*Data))
return a
}
@@ -741,7 +740,7 @@ func newUnionBuilder(mem memory.Allocator, children
[]Builder, typ arrow.UnionTy
children = make([]Builder, 0)
}
b := unionBuilder{
- builder: builder{refCount: 1, mem: mem},
+ builder: builder{mem: mem},
mode: typ.Mode(),
codes: typ.TypeCodes(),
children: children,
@@ -750,6 +749,7 @@ func newUnionBuilder(mem memory.Allocator, children
[]Builder, typ arrow.UnionTy
childFields: make([]arrow.Field, len(children)),
typesBuilder: newInt8BufferBuilder(mem),
}
+ b.refCount.Add(1)
b.typeIDtoChildID[0] = arrow.InvalidUnionChildID
for i := 1; i < len(b.typeIDtoChildID); i *= 2 {
@@ -795,9 +795,9 @@ func (b *unionBuilder) reserve(elements int, resize
func(int)) {
}
func (b *unionBuilder) Release() {
- debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+ debug.Assert(b.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.refCount.Add(-1) == 0 {
for _, c := range b.children {
c.Release()
}
@@ -854,7 +854,6 @@ func (b *unionBuilder) nextTypeID() arrow.UnionTypeCode {
id := b.denseTypeID
b.denseTypeID++
return id
-
}
func (b *unionBuilder) newData() *Data {
@@ -1228,9 +1227,9 @@ func (b *DenseUnionBuilder) Append(nextType
arrow.UnionTypeCode) {
}
func (b *DenseUnionBuilder) Release() {
- debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases")
+ debug.Assert(b.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.refCount.Add(-1) == 0 {
for _, c := range b.children {
c.Release()
}
diff --git a/arrow/avro/reader.go b/arrow/avro/reader.go
index c19c9ed..023eabd 100644
--- a/arrow/avro/reader.go
+++ b/arrow/avro/reader.go
@@ -54,7 +54,7 @@ type OCFReader struct {
avroSchemaEdits []schemaEdit
schema *arrow.Schema
- refs int64
+ refs atomic.Int64
bld *array.RecordBuilder
bldMap *fieldPos
ldr *dataLoader
@@ -89,11 +89,12 @@ func NewOCFReader(r io.Reader, opts ...Option) (*OCFReader,
error) {
rr := &OCFReader{
r: ocfr,
- refs: 1,
chunk: 1,
avroChanSize: 500,
recChanSize: 10,
}
+ rr.refs.Add(1)
+
for _, opt := range opts {
opt(rr)
}
@@ -318,16 +319,16 @@ func WithChunk(n int) Option {
// Retain increases the reference count by 1.
// Retain may be called simultaneously from multiple goroutines.
func (r *OCFReader) Retain() {
- atomic.AddInt64(&r.refs, 1)
+ r.refs.Add(1)
}
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
// Release may be called simultaneously from multiple goroutines.
func (r *OCFReader) Release() {
- debug.Assert(atomic.LoadInt64(&r.refs) > 0, "too many releases")
+ debug.Assert(r.refs.Load() > 0, "too many releases")
- if atomic.AddInt64(&r.refs, -1) == 0 {
+ if r.refs.Add(-1) == 0 {
if r.cur != nil {
r.cur.Release()
}
diff --git a/arrow/cdata/cdata.go b/arrow/cdata/cdata.go
index 86f2b50..b56edb4 100644
--- a/arrow/cdata/cdata.go
+++ b/arrow/cdata/cdata.go
@@ -194,7 +194,8 @@ func importSchema(schema *CArrowSchema) (ret arrow.Field,
err error) {
ret.Type = &arrow.DictionaryType{
IndexType: ret.Type,
ValueType: valueField.Type,
- Ordered:
schema.dictionary.flags&C.ARROW_FLAG_DICTIONARY_ORDERED != 0}
+ Ordered:
schema.dictionary.flags&C.ARROW_FLAG_DICTIONARY_ORDERED != 0,
+ }
}
return
@@ -460,7 +461,7 @@ func (imp *cimporter) doImportArr(src *CArrowArray) error {
// struct immediately after import, since we have no imported
// memory that we have to track the lifetime of.
defer func() {
- if imp.alloc.bufCount == 0 {
+ if imp.alloc.bufCount.Load() == 0 {
C.ArrowArrayRelease(imp.arr)
C.free(unsafe.Pointer(imp.arr))
}
@@ -662,9 +663,7 @@ func (imp *cimporter) importStringLike(offsetByteWidth
int64) (err error) {
return
}
- var (
- nulls, offsets, values *memory.Buffer
- )
+ var nulls, offsets, values *memory.Buffer
if nulls, err = imp.importNullBitmap(0); err != nil {
return
}
diff --git a/arrow/cdata/import_allocator.go b/arrow/cdata/import_allocator.go
index ba5edf4..d2cc44b 100644
--- a/arrow/cdata/import_allocator.go
+++ b/arrow/cdata/import_allocator.go
@@ -28,13 +28,13 @@ import (
import "C"
type importAllocator struct {
- bufCount int64
+ bufCount atomic.Int64
arr *CArrowArray
}
func (i *importAllocator) addBuffer() {
- atomic.AddInt64(&i.bufCount, 1)
+ i.bufCount.Add(1)
}
func (*importAllocator) Allocate(int) []byte {
@@ -46,9 +46,9 @@ func (*importAllocator) Reallocate(int, []byte) []byte {
}
func (i *importAllocator) Free([]byte) {
- debug.Assert(atomic.LoadInt64(&i.bufCount) > 0, "too many releases")
+ debug.Assert(i.bufCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&i.bufCount, -1) == 0 {
+ if i.bufCount.Add(-1) == 0 {
defer C.free(unsafe.Pointer(i.arr))
C.ArrowArrayRelease(i.arr)
if C.ArrowArrayIsReleased(i.arr) != 1 {
diff --git a/arrow/compute/exec/utils.go b/arrow/compute/exec/utils.go
index e368520..58c4c0c 100644
--- a/arrow/compute/exec/utils.go
+++ b/arrow/compute/exec/utils.go
@@ -158,7 +158,7 @@ func RechunkArraysConsistently(groups [][]arrow.Array)
[][]arrow.Array {
type ChunkResolver struct {
offsets []int64
- cached int64
+ cached atomic.Int64
}
func NewChunkResolver(chunks []arrow.Array) *ChunkResolver {
@@ -184,7 +184,7 @@ func (c *ChunkResolver) Resolve(idx int64) (chunk, index
int64) {
return 0, idx
}
- cached := atomic.LoadInt64(&c.cached)
+ cached := c.cached.Load()
cacheHit := idx >= c.offsets[cached] && idx < c.offsets[cached+1]
if cacheHit {
return cached, idx - c.offsets[cached]
@@ -196,7 +196,7 @@ func (c *ChunkResolver) Resolve(idx int64) (chunk, index
int64) {
}
chunk, index = int64(chkIdx), idx-c.offsets[chkIdx]
- atomic.StoreInt64(&c.cached, chunk)
+ c.cached.Store(chunk)
return
}
@@ -214,7 +214,8 @@ type BoolIter struct {
func NewBoolIter(arr *ArraySpan) ArrayIter[bool] {
return &BoolIter{
- Rdr: bitutil.NewBitmapReader(arr.Buffers[1].Buf,
int(arr.Offset), int(arr.Len))}
+ Rdr: bitutil.NewBitmapReader(arr.Buffers[1].Buf,
int(arr.Offset), int(arr.Len)),
+ }
}
func (b *BoolIter) Next() (out bool) {
diff --git a/arrow/csv/reader.go b/arrow/csv/reader.go
index dd0c0f1..db0f836 100644
--- a/arrow/csv/reader.go
+++ b/arrow/csv/reader.go
@@ -43,7 +43,7 @@ type Reader struct {
r *csv.Reader
schema *arrow.Schema
- refs int64
+ refs atomic.Int64
bld *array.RecordBuilder
cur arrow.Record
err error
@@ -75,10 +75,10 @@ type Reader struct {
func NewInferringReader(r io.Reader, opts ...Option) *Reader {
rr := &Reader{
r: csv.NewReader(r),
- refs: 1,
chunk: 1,
stringsCanBeNull: false,
}
+ rr.refs.Add(1)
rr.r.ReuseRecord = true
for _, opt := range opts {
opt(rr)
@@ -111,10 +111,10 @@ func NewReader(r io.Reader, schema *arrow.Schema, opts
...Option) *Reader {
rr := &Reader{
r: csv.NewReader(r),
schema: schema,
- refs: 1,
chunk: 1,
stringsCanBeNull: false,
}
+ rr.refs.Add(1)
rr.r.ReuseRecord = true
for _, opt := range opts {
opt(rr)
@@ -288,9 +288,7 @@ func (r *Reader) nextall() bool {
r.done = true
}()
- var (
- recs [][]string
- )
+ var recs [][]string
recs, r.err = r.r.ReadAll()
if r.err != nil {
@@ -926,16 +924,16 @@ func (r *Reader) parseExtension(field array.Builder, str
string) {
// Retain increases the reference count by 1.
// Retain may be called simultaneously from multiple goroutines.
func (r *Reader) Retain() {
- atomic.AddInt64(&r.refs, 1)
+ r.refs.Add(1)
}
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
// Release may be called simultaneously from multiple goroutines.
func (r *Reader) Release() {
- debug.Assert(atomic.LoadInt64(&r.refs) > 0, "too many releases")
+ debug.Assert(r.refs.Load() > 0, "too many releases")
- if atomic.AddInt64(&r.refs, -1) == 0 {
+ if r.refs.Add(-1) == 0 {
if r.cur != nil {
r.cur.Release()
}
@@ -1025,6 +1023,4 @@ func tryParse(val string, dt arrow.DataType) error {
panic("shouldn't end up here")
}
-var (
- _ array.RecordReader = (*Reader)(nil)
-)
+var _ array.RecordReader = (*Reader)(nil)
diff --git a/arrow/flight/flightsql/example/sql_batch_reader.go
b/arrow/flight/flightsql/example/sql_batch_reader.go
index 5a6cafe..e0892e0 100644
--- a/arrow/flight/flightsql/example/sql_batch_reader.go
+++ b/arrow/flight/flightsql/example/sql_batch_reader.go
@@ -99,7 +99,7 @@ func getArrowType(c *sql.ColumnType) arrow.DataType {
const maxBatchSize = 1024
type SqlBatchReader struct {
- refCount int64
+ refCount atomic.Int64
schema *arrow.Schema
rows *sql.Rows
@@ -152,12 +152,15 @@ func NewSqlBatchReaderWithSchema(mem memory.Allocator,
schema *arrow.Schema, row
}
}
- return &SqlBatchReader{
- refCount: 1,
- bldr: array.NewRecordBuilder(mem, schema),
- schema: schema,
- rowdest: rowdest,
- rows: rows}, nil
+ sqb := &SqlBatchReader{
+ bldr: array.NewRecordBuilder(mem, schema),
+ schema: schema,
+ rowdest: rowdest,
+ rows: rows,
+ }
+
+ sqb.refCount.Add(1)
+ return sqb, nil
}
func NewSqlBatchReader(mem memory.Allocator, rows *sql.Rows) (*SqlBatchReader,
error) {
@@ -219,22 +222,24 @@ func NewSqlBatchReader(mem memory.Allocator, rows
*sql.Rows) (*SqlBatchReader, e
}
schema := arrow.NewSchema(fields, nil)
- return &SqlBatchReader{
- refCount: 1,
- bldr: array.NewRecordBuilder(mem, schema),
- schema: schema,
- rowdest: rowdest,
- rows: rows}, nil
+ sbr := &SqlBatchReader{
+ bldr: array.NewRecordBuilder(mem, schema),
+ schema: schema,
+ rowdest: rowdest,
+ rows: rows,
+ }
+ sbr.refCount.Add(1)
+ return sbr, nil
}
func (r *SqlBatchReader) Retain() {
- atomic.AddInt64(&r.refCount, 1)
+ r.refCount.Add(1)
}
func (r *SqlBatchReader) Release() {
- debug.Assert(atomic.LoadInt64(&r.refCount) > 0, "too many releases")
+ debug.Assert(r.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&r.refCount, -1) == 0 {
+ if r.refCount.Add(-1) == 0 {
r.rows.Close()
r.rows, r.schema, r.rowdest = nil, nil, nil
r.bldr.Release()
diff --git
a/arrow/flight/flightsql/example/sqlite_tables_schema_batch_reader.go
b/arrow/flight/flightsql/example/sqlite_tables_schema_batch_reader.go
index 3009635..55b2390 100644
--- a/arrow/flight/flightsql/example/sqlite_tables_schema_batch_reader.go
+++ b/arrow/flight/flightsql/example/sqlite_tables_schema_batch_reader.go
@@ -35,7 +35,7 @@ import (
)
type SqliteTablesSchemaBatchReader struct {
- refCount int64
+ refCount atomic.Int64
mem memory.Allocator
ctx context.Context
@@ -57,24 +57,25 @@ func NewSqliteTablesSchemaBatchReader(ctx context.Context,
mem memory.Allocator,
return nil, err
}
- return &SqliteTablesSchemaBatchReader{
- refCount: 1,
+ stsbr := &SqliteTablesSchemaBatchReader{
ctx: ctx,
rdr: rdr,
stmt: stmt,
mem: mem,
schemaBldr: array.NewBinaryBuilder(mem,
arrow.BinaryTypes.Binary),
- }, nil
+ }
+ stsbr.refCount.Add(1)
+ return stsbr, nil
}
func (s *SqliteTablesSchemaBatchReader) Err() error { return s.err }
-func (s *SqliteTablesSchemaBatchReader) Retain() {
atomic.AddInt64(&s.refCount, 1) }
+func (s *SqliteTablesSchemaBatchReader) Retain() { s.refCount.Add(1) }
func (s *SqliteTablesSchemaBatchReader) Release() {
- debug.Assert(atomic.LoadInt64(&s.refCount) > 0, "too many releases")
+ debug.Assert(s.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&s.refCount, -1) == 0 {
+ if s.refCount.Add(-1) == 0 {
s.rdr.Release()
s.stmt.Close()
s.schemaBldr.Release()
diff --git a/arrow/flight/record_batch_reader.go
b/arrow/flight/record_batch_reader.go
index c6596e8..c65d89f 100644
--- a/arrow/flight/record_batch_reader.go
+++ b/arrow/flight/record_batch_reader.go
@@ -40,7 +40,7 @@ type dataMessageReader struct {
rdr DataStreamReader
peeked *FlightData
- refCount int64
+ refCount atomic.Int64
msg *ipc.Message
lastAppMetadata []byte
@@ -78,13 +78,13 @@ func (d *dataMessageReader) Message() (*ipc.Message, error)
{
}
func (d *dataMessageReader) Retain() {
- atomic.AddInt64(&d.refCount, 1)
+ d.refCount.Add(1)
}
func (d *dataMessageReader) Release() {
- debug.Assert(atomic.LoadInt64(&d.refCount) > 0, "too many releases")
+ debug.Assert(d.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&d.refCount, -1) == 0 {
+ if d.refCount.Add(-1) == 0 {
if d.msg != nil {
d.msg.Release()
d.msg = nil
@@ -154,7 +154,8 @@ func NewRecordReader(r DataStreamReader, opts
...ipc.Option) (*Reader, error) {
return nil, err
}
- rdr := &Reader{dmr: &dataMessageReader{rdr: r, refCount: 1}}
+ rdr := &Reader{dmr: &dataMessageReader{rdr: r}}
+ rdr.dmr.refCount.Add(1)
rdr.dmr.descr = data.FlightDescriptor
if len(data.DataHeader) > 0 {
rdr.dmr.peeked = data
diff --git a/arrow/internal/arrjson/reader.go b/arrow/internal/arrjson/reader.go
index 0db9474..ec021fc 100644
--- a/arrow/internal/arrjson/reader.go
+++ b/arrow/internal/arrjson/reader.go
@@ -28,7 +28,7 @@ import (
)
type Reader struct {
- refs int64
+ refs atomic.Int64
schema *arrow.Schema
recs []arrow.Record
@@ -55,27 +55,27 @@ func NewReader(r io.Reader, opts ...Option) (*Reader,
error) {
schema := schemaFromJSON(raw.Schema, &memo)
dictionariesFromJSON(cfg.alloc, raw.Dictionaries, &memo)
rr := &Reader{
- refs: 1,
schema: schema,
recs: recordsFromJSON(cfg.alloc, schema, raw.Records, &memo),
memo: &memo,
}
+ rr.refs.Add(1)
return rr, nil
}
// Retain increases the reference count by 1.
// Retain may be called simultaneously from multiple goroutines.
func (r *Reader) Retain() {
- atomic.AddInt64(&r.refs, 1)
+ r.refs.Add(1)
}
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
// Release may be called simultaneously from multiple goroutines.
func (r *Reader) Release() {
- debug.Assert(atomic.LoadInt64(&r.refs) > 0, "too many releases")
+ debug.Assert(r.refs.Load() > 0, "too many releases")
- if atomic.AddInt64(&r.refs, -1) == 0 {
+ if r.refs.Add(-1) == 0 {
for i, rec := range r.recs {
if r.recs[i] != nil {
rec.Release()
@@ -106,6 +106,4 @@ func (r *Reader) ReadAt(index int) (arrow.Record, error) {
return rec, nil
}
-var (
- _ arrio.Reader = (*Reader)(nil)
-)
+var _ arrio.Reader = (*Reader)(nil)
diff --git a/arrow/ipc/message.go b/arrow/ipc/message.go
index f989cf2..c96869e 100644
--- a/arrow/ipc/message.go
+++ b/arrow/ipc/message.go
@@ -66,7 +66,7 @@ func (m MessageType) String() string {
// Message is an IPC message, including metadata and body.
type Message struct {
- refCount int64
+ refCount atomic.Int64
msg *flatbuf.Message
meta *memory.Buffer
body *memory.Buffer
@@ -80,12 +80,13 @@ func NewMessage(meta, body *memory.Buffer) *Message {
}
meta.Retain()
body.Retain()
- return &Message{
- refCount: 1,
- msg: flatbuf.GetRootAsMessage(meta.Bytes(), 0),
- meta: meta,
- body: body,
+ m := &Message{
+ msg: flatbuf.GetRootAsMessage(meta.Bytes(), 0),
+ meta: meta,
+ body: body,
}
+ m.refCount.Add(1)
+ return m
}
func newMessageFromFB(meta *flatbuf.Message, body *memory.Buffer) *Message {
@@ -93,27 +94,28 @@ func newMessageFromFB(meta *flatbuf.Message, body
*memory.Buffer) *Message {
panic("arrow/ipc: nil buffers")
}
body.Retain()
- return &Message{
- refCount: 1,
- msg: meta,
- meta: memory.NewBufferBytes(meta.Table().Bytes),
- body: body,
+ m := &Message{
+ msg: meta,
+ meta: memory.NewBufferBytes(meta.Table().Bytes),
+ body: body,
}
+ m.refCount.Add(1)
+ return m
}
// Retain increases the reference count by 1.
// Retain may be called simultaneously from multiple goroutines.
func (msg *Message) Retain() {
- atomic.AddInt64(&msg.refCount, 1)
+ msg.refCount.Add(1)
}
// Release decreases the reference count by 1.
// Release may be called simultaneously from multiple goroutines.
// When the reference count goes to zero, the memory is freed.
func (msg *Message) Release() {
- debug.Assert(atomic.LoadInt64(&msg.refCount) > 0, "too many releases")
+ debug.Assert(msg.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&msg.refCount, -1) == 0 {
+ if msg.refCount.Add(-1) == 0 {
msg.meta.Release()
msg.body.Release()
msg.msg = nil
@@ -144,7 +146,7 @@ type MessageReader interface {
type messageReader struct {
r io.Reader
- refCount int64
+ refCount atomic.Int64
msg *Message
mem memory.Allocator
@@ -157,22 +159,24 @@ func NewMessageReader(r io.Reader, opts ...Option)
MessageReader {
opt(cfg)
}
- return &messageReader{r: r, refCount: 1, mem: cfg.alloc}
+ mr := &messageReader{r: r, mem: cfg.alloc}
+ mr.refCount.Add(1)
+ return mr
}
// Retain increases the reference count by 1.
// Retain may be called simultaneously from multiple goroutines.
func (r *messageReader) Retain() {
- atomic.AddInt64(&r.refCount, 1)
+ r.refCount.Add(1)
}
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
// Release may be called simultaneously from multiple goroutines.
func (r *messageReader) Release() {
- debug.Assert(atomic.LoadInt64(&r.refCount) > 0, "too many releases")
+ debug.Assert(r.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&r.refCount, -1) == 0 {
+ if r.refCount.Add(-1) == 0 {
if r.msg != nil {
r.msg.Release()
r.msg = nil
@@ -184,7 +188,7 @@ func (r *messageReader) Release() {
// underlying stream.
// It is valid until the next call to Message.
func (r *messageReader) Message() (*Message, error) {
- var buf = make([]byte, 4)
+ buf := make([]byte, 4)
_, err := io.ReadFull(r.r, buf)
if err != nil {
return nil, fmt.Errorf("arrow/ipc: could not read continuation
indicator: %w", err)
diff --git a/arrow/memory/buffer.go b/arrow/memory/buffer.go
index 0472222..592da70 100644
--- a/arrow/memory/buffer.go
+++ b/arrow/memory/buffer.go
@@ -24,7 +24,7 @@ import (
// Buffer is a wrapper type for a buffer of bytes.
type Buffer struct {
- refCount int64
+ refCount atomic.Int64
buf []byte
length int
mutable bool
@@ -42,22 +42,28 @@ type Buffer struct {
// through the c data interface and tracking the lifetime of the
// imported buffers.
func NewBufferWithAllocator(data []byte, mem Allocator) *Buffer {
- return &Buffer{refCount: 1, buf: data, length: len(data), mem: mem}
+ b := &Buffer{buf: data, length: len(data), mem: mem}
+ b.refCount.Add(1)
+ return b
}
// NewBufferBytes creates a fixed-size buffer from the specified data.
func NewBufferBytes(data []byte) *Buffer {
- return &Buffer{refCount: 0, buf: data, length: len(data)}
+ return &Buffer{buf: data, length: len(data)}
}
// NewResizableBuffer creates a mutable, resizable buffer with an Allocator
for managing memory.
func NewResizableBuffer(mem Allocator) *Buffer {
- return &Buffer{refCount: 1, mutable: true, mem: mem}
+ b := &Buffer{mutable: true, mem: mem}
+ b.refCount.Add(1)
+ return b
}
func SliceBuffer(buf *Buffer, offset, length int) *Buffer {
buf.Retain()
- return &Buffer{refCount: 1, parent: buf, buf: buf.Bytes()[offset :
offset+length], length: length}
+ b := &Buffer{parent: buf, buf: buf.Bytes()[offset : offset+length],
length: length}
+ b.refCount.Add(1)
+ return b
}
// Parent returns either nil or a pointer to the parent buffer if this buffer
@@ -67,7 +73,7 @@ func (b *Buffer) Parent() *Buffer { return b.parent }
// Retain increases the reference count by 1.
func (b *Buffer) Retain() {
if b.mem != nil || b.parent != nil {
- atomic.AddInt64(&b.refCount, 1)
+ b.refCount.Add(1)
}
}
@@ -75,9 +81,9 @@ func (b *Buffer) Retain() {
// When the reference count goes to zero, the memory is freed.
func (b *Buffer) Release() {
if b.mem != nil || b.parent != nil {
- debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many
releases")
+ debug.Assert(b.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&b.refCount, -1) == 0 {
+ if b.refCount.Add(-1) == 0 {
if b.mem != nil {
b.mem.Free(b.buf)
} else {
diff --git a/arrow/memory/checked_allocator.go
b/arrow/memory/checked_allocator.go
index 78a09a5..103a085 100644
--- a/arrow/memory/checked_allocator.go
+++ b/arrow/memory/checked_allocator.go
@@ -32,7 +32,7 @@ import (
type CheckedAllocator struct {
mem Allocator
- sz int64
+ sz atomic.Int64
allocs sync.Map
}
@@ -41,10 +41,10 @@ func NewCheckedAllocator(mem Allocator) *CheckedAllocator {
return &CheckedAllocator{mem: mem}
}
-func (a *CheckedAllocator) CurrentAlloc() int { return
int(atomic.LoadInt64(&a.sz)) }
+func (a *CheckedAllocator) CurrentAlloc() int { return int(a.sz.Load()) }
func (a *CheckedAllocator) Allocate(size int) []byte {
- atomic.AddInt64(&a.sz, int64(size))
+ a.sz.Add(int64(size))
out := a.mem.Allocate(size)
if size == 0 {
return out
@@ -66,7 +66,7 @@ func (a *CheckedAllocator) Allocate(size int) []byte {
}
func (a *CheckedAllocator) Reallocate(size int, b []byte) []byte {
- atomic.AddInt64(&a.sz, int64(size-len(b)))
+ a.sz.Add(int64(size - len(b)))
oldptr := uintptr(unsafe.Pointer(&b[0]))
out := a.mem.Reallocate(size, b)
@@ -92,7 +92,7 @@ func (a *CheckedAllocator) Reallocate(size int, b []byte)
[]byte {
}
func (a *CheckedAllocator) Free(b []byte) {
- atomic.AddInt64(&a.sz, int64(len(b)*-1))
+ a.sz.Add(int64(len(b) * -1))
defer a.mem.Free(b)
if len(b) == 0 {
@@ -192,9 +192,9 @@ func (a *CheckedAllocator) AssertSize(t TestingT, sz int) {
return true
})
- if int(atomic.LoadInt64(&a.sz)) != sz {
+ if int(a.sz.Load()) != sz {
t.Helper()
- t.Errorf("invalid memory size exp=%d, got=%d", sz, a.sz)
+ t.Errorf("invalid memory size exp=%d, got=%d", sz, a.sz.Load())
}
}
@@ -204,18 +204,16 @@ type CheckedAllocatorScope struct {
}
func NewCheckedAllocatorScope(alloc *CheckedAllocator) *CheckedAllocatorScope {
- sz := atomic.LoadInt64(&alloc.sz)
+ sz := alloc.sz.Load()
return &CheckedAllocatorScope{alloc: alloc, sz: int(sz)}
}
func (c *CheckedAllocatorScope) CheckSize(t TestingT) {
- sz := int(atomic.LoadInt64(&c.alloc.sz))
+ sz := int(c.alloc.sz.Load())
if c.sz != sz {
t.Helper()
t.Errorf("invalid memory size exp=%d, got=%d", c.sz, sz)
}
}
-var (
- _ Allocator = (*CheckedAllocator)(nil)
-)
+var _ Allocator = (*CheckedAllocator)(nil)
diff --git a/arrow/table.go b/arrow/table.go
index 6d19d9f..bdbf85b 100644
--- a/arrow/table.go
+++ b/arrow/table.go
@@ -79,16 +79,17 @@ func NewColumnFromArr(field Field, arr Array) Column {
}
arr.Retain()
- return Column{
+ col := Column{
field: field,
data: &Chunked{
- refCount: 1,
- chunks: []Array{arr},
- length: arr.Len(),
- nulls: arr.NullN(),
- dtype: field.Type,
+ chunks: []Array{arr},
+ length: arr.Len(),
+ nulls: arr.NullN(),
+ dtype: field.Type,
},
}
+ col.data.refCount.Add(1)
+ return col
}
// NewColumn returns a column from a field and a chunked data array.
@@ -132,7 +133,7 @@ func (col *Column) DataType() DataType { return
col.field.Type }
// Chunked manages a collection of primitives arrays as one logical large
array.
type Chunked struct {
- refCount int64 // refCount must be first in the struct for 64 bit
alignment and sync/atomic (https://github.com/golang/go/issues/37262)
+ refCount atomic.Int64
chunks []Array
@@ -146,10 +147,11 @@ type Chunked struct {
// NewChunked panics if the chunks do not have the same data type.
func NewChunked(dtype DataType, chunks []Array) *Chunked {
arr := &Chunked{
- chunks: make([]Array, 0, len(chunks)),
- refCount: 1,
- dtype: dtype,
+ chunks: make([]Array, 0, len(chunks)),
+ dtype: dtype,
}
+ arr.refCount.Add(1)
+
for _, chunk := range chunks {
if chunk == nil {
continue
@@ -169,16 +171,16 @@ func NewChunked(dtype DataType, chunks []Array) *Chunked {
// Retain increases the reference count by 1.
// Retain may be called simultaneously from multiple goroutines.
func (a *Chunked) Retain() {
- atomic.AddInt64(&a.refCount, 1)
+ a.refCount.Add(1)
}
// Release decreases the reference count by 1.
// When the reference count goes to zero, the memory is freed.
// Release may be called simultaneously from multiple goroutines.
func (a *Chunked) Release() {
- debug.Assert(atomic.LoadInt64(&a.refCount) > 0, "too many releases")
+ debug.Assert(a.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&a.refCount, -1) == 0 {
+ if a.refCount.Add(-1) == 0 {
for _, arr := range a.chunks {
arr.Release()
}
diff --git a/arrow/tensor/tensor.go b/arrow/tensor/tensor.go
index b3bdf32..70bbe57 100644
--- a/arrow/tensor/tensor.go
+++ b/arrow/tensor/tensor.go
@@ -65,7 +65,7 @@ type Interface interface {
}
type tensorBase struct {
- refCount int64
+ refCount atomic.Int64
dtype arrow.DataType
bw int64 // bytes width
data arrow.ArrayData
@@ -77,16 +77,16 @@ type tensorBase struct {
// Retain increases the reference count by 1.
// Retain may be called simultaneously from multiple goroutines.
func (tb *tensorBase) Retain() {
- atomic.AddInt64(&tb.refCount, 1)
+ tb.refCount.Add(1)
}
// Release decreases the reference count by 1.
// Release may be called simultaneously from multiple goroutines.
// When the reference count goes to zero, the memory is freed.
func (tb *tensorBase) Release() {
- debug.Assert(atomic.LoadInt64(&tb.refCount) > 0, "too many releases")
+ debug.Assert(tb.refCount.Load() > 0, "too many releases")
- if atomic.AddInt64(&tb.refCount, -1) == 0 {
+ if tb.refCount.Add(-1) == 0 {
tb.data.Release()
tb.data = nil
}
@@ -172,14 +172,14 @@ func New(data arrow.ArrayData, shape, strides []int64,
names []string) Interface
func newTensor(dtype arrow.DataType, data arrow.ArrayData, shape, strides
[]int64, names []string) *tensorBase {
tb := tensorBase{
- refCount: 1,
- dtype: dtype,
- bw: int64(dtype.(arrow.FixedWidthDataType).BitWidth()) /
8,
- data: data,
- shape: shape,
- strides: strides,
- names: names,
+ dtype: dtype,
+ bw: int64(dtype.(arrow.FixedWidthDataType).BitWidth()) / 8,
+ data: data,
+ shape: shape,
+ strides: strides,
+ names: names,
}
+ tb.refCount.Add(1)
tb.data.Retain()
if len(tb.shape) > 0 && len(tb.strides) == 0 {
diff --git a/arrow/tmpl b/arrow/tmpl
new file mode 100755
index 0000000..60df161
Binary files /dev/null and b/arrow/tmpl differ
diff --git a/parquet/file/record_reader.go b/parquet/file/record_reader.go
index e8b0dee..e2fdcc8 100644
--- a/parquet/file/record_reader.go
+++ b/parquet/file/record_reader.go
@@ -130,12 +130,12 @@ type primitiveRecordReader struct {
validBits *memory.Buffer
mem memory.Allocator
- refCount int64
+ refCount atomic.Int64
useValues bool
}
-func createPrimitiveRecordReader(descr *schema.Column, mem memory.Allocator,
bufferPool *sync.Pool) primitiveRecordReader {
- return primitiveRecordReader{
+func createPrimitiveRecordReader(descr *schema.Column, mem memory.Allocator,
bufferPool *sync.Pool) *primitiveRecordReader {
+ prr := &primitiveRecordReader{
ColumnChunkReader: newTypedColumnChunkReader(columnChunkReader{
descr: descr,
mem: mem,
@@ -144,17 +144,19 @@ func createPrimitiveRecordReader(descr *schema.Column,
mem memory.Allocator, buf
values: memory.NewResizableBuffer(mem),
validBits: memory.NewResizableBuffer(mem),
mem: mem,
- refCount: 1,
useValues: descr.PhysicalType() != parquet.Types.ByteArray &&
descr.PhysicalType() != parquet.Types.FixedLenByteArray,
}
+
+ prr.refCount.Add(1)
+ return prr
}
func (pr *primitiveRecordReader) Retain() {
- atomic.AddInt64(&pr.refCount, 1)
+ pr.refCount.Add(1)
}
func (pr *primitiveRecordReader) Release() {
- if atomic.AddInt64(&pr.refCount, -1) == 0 {
+ if pr.refCount.Add(-1) == 0 {
if pr.values != nil {
pr.values.Release()
pr.values = nil
@@ -325,7 +327,7 @@ type recordReader struct {
defLevels *memory.Buffer
repLevels *memory.Buffer
- refCount int64
+ refCount atomic.Int64
}
// binaryRecordReader is the recordReaderImpl for non-primitive data
@@ -346,22 +348,22 @@ func newRecordReader(descr *schema.Column, info
LevelInfo, mem memory.Allocator,
mem = memory.DefaultAllocator
}
- pr := createPrimitiveRecordReader(descr, mem, bufferPool)
- return &recordReader{
- refCount: 1,
- recordReaderImpl: &pr,
+ rr := &recordReader{
+ recordReaderImpl: createPrimitiveRecordReader(descr, mem,
bufferPool),
leafInfo: info,
defLevels: memory.NewResizableBuffer(mem),
repLevels: memory.NewResizableBuffer(mem),
}
+ rr.refCount.Add(1)
+ return rr
}
func (rr *recordReader) Retain() {
- atomic.AddInt64(&rr.refCount, 1)
+ rr.refCount.Add(1)
}
func (rr *recordReader) Release() {
- if atomic.AddInt64(&rr.refCount, -1) == 0 {
+ if rr.refCount.Add(-1) == 0 {
rr.recordReaderImpl.Release()
rr.defLevels.Release()
rr.repLevels.Release()
@@ -761,17 +763,18 @@ func newFLBARecordReader(descr *schema.Column, info
LevelInfo, mem memory.Alloca
byteWidth := descr.TypeLength()
- return &binaryRecordReader{&recordReader{
+ brr := &binaryRecordReader{&recordReader{
recordReaderImpl: &flbaRecordReader{
- createPrimitiveRecordReader(descr, mem, bufferPool),
+ *createPrimitiveRecordReader(descr, mem, bufferPool),
array.NewFixedSizeBinaryBuilder(mem,
&arrow.FixedSizeBinaryType{ByteWidth: byteWidth}),
nil,
},
leafInfo: info,
defLevels: memory.NewResizableBuffer(mem),
repLevels: memory.NewResizableBuffer(mem),
- refCount: 1,
}}
+ brr.refCount.Add(1)
+ return brr
}
// byteArrayRecordReader is the specialization impl for byte-array columns
@@ -793,17 +796,18 @@ func newByteArrayRecordReader(descr *schema.Column, info
LevelInfo, dtype arrow.
dt = arrow.BinaryTypes.Binary
}
- return &binaryRecordReader{&recordReader{
+ brr := &binaryRecordReader{&recordReader{
recordReaderImpl: &byteArrayRecordReader{
- createPrimitiveRecordReader(descr, mem, bufferPool),
+ *createPrimitiveRecordReader(descr, mem, bufferPool),
array.NewBinaryBuilder(mem, dt),
nil,
},
leafInfo: info,
defLevels: memory.NewResizableBuffer(mem),
repLevels: memory.NewResizableBuffer(mem),
- refCount: 1,
}}
+ brr.refCount.Add(1)
+ return brr
}
func (br *byteArrayRecordReader) ReserveValues(extra int64, hasNullable bool)
error {
@@ -913,10 +917,10 @@ func newByteArrayDictRecordReader(descr *schema.Column,
info LevelInfo, dtype ar
dt.ValueType = arrow.BinaryTypes.Binary
}
- return &binaryRecordReader{&recordReader{
+ brr := &binaryRecordReader{&recordReader{
recordReaderImpl: &byteArrayDictRecordReader{
byteArrayRecordReader: byteArrayRecordReader{
- createPrimitiveRecordReader(descr, mem,
bufferPool),
+ *createPrimitiveRecordReader(descr, mem,
bufferPool),
array.NewDictionaryBuilder(mem, dt),
nil,
},
@@ -925,8 +929,10 @@ func newByteArrayDictRecordReader(descr *schema.Column,
info LevelInfo, dtype ar
leafInfo: info,
defLevels: memory.NewResizableBuffer(mem),
repLevels: memory.NewResizableBuffer(mem),
- refCount: 1,
}}
+
+ brr.refCount.Add(1)
+ return brr
}
func (bd *byteArrayDictRecordReader) GetBuilderChunks() []arrow.Array {
diff --git a/parquet/pqarrow/column_readers.go
b/parquet/pqarrow/column_readers.go
index 5047d88..b013807 100644
--- a/parquet/pqarrow/column_readers.go
+++ b/parquet/pqarrow/column_readers.go
@@ -49,7 +49,7 @@ type leafReader struct {
recordRdr file.RecordReader
props ArrowReadProperties
- refCount int64
+ refCount atomic.Int64
}
func newLeafReader(rctx *readerCtx, field *arrow.Field, input *columnIterator,
leafInfo file.LevelInfo, props ArrowReadProperties, bufferPool *sync.Pool)
(*ColumnReader, error) {
@@ -60,18 +60,19 @@ func newLeafReader(rctx *readerCtx, field *arrow.Field,
input *columnIterator, l
descr: input.Descr(),
recordRdr: file.NewRecordReader(input.Descr(), leafInfo,
field.Type, rctx.mem, bufferPool),
props: props,
- refCount: 1,
}
+ ret.refCount.Add(1)
+
err := ret.nextRowGroup()
return &ColumnReader{ret}, err
}
func (lr *leafReader) Retain() {
- atomic.AddInt64(&lr.refCount, 1)
+ lr.refCount.Add(1)
}
func (lr *leafReader) Release() {
- if atomic.AddInt64(&lr.refCount, -1) == 0 {
+ if lr.refCount.Add(-1) == 0 {
lr.releaseOut()
if lr.recordRdr != nil {
lr.recordRdr.Release()
@@ -165,15 +166,15 @@ type structReader struct {
hasRepeatedChild bool
props ArrowReadProperties
- refCount int64
+ refCount atomic.Int64
}
func (sr *structReader) Retain() {
- atomic.AddInt64(&sr.refCount, 1)
+ sr.refCount.Add(1)
}
func (sr *structReader) Release() {
- if atomic.AddInt64(&sr.refCount, -1) == 0 {
+ if sr.refCount.Add(-1) == 0 {
if sr.defRepLevelChild != nil {
sr.defRepLevelChild.Release()
sr.defRepLevelChild = nil
@@ -192,8 +193,8 @@ func newStructReader(rctx *readerCtx, filtered
*arrow.Field, levelInfo file.Leve
levelInfo: levelInfo,
children: children,
props: props,
- refCount: 1,
}
+ ret.refCount.Add(1)
// there could be a mix of children some might be repeated and some
might not be
// if possible use one that isn't since that will be guaranteed to have
the least
@@ -348,20 +349,24 @@ type listReader struct {
info file.LevelInfo
itemRdr *ColumnReader
props ArrowReadProperties
- refCount int64
+ refCount atomic.Int64
}
func newListReader(rctx *readerCtx, field *arrow.Field, info file.LevelInfo,
childRdr *ColumnReader, props ArrowReadProperties) *ColumnReader {
childRdr.Retain()
- return &ColumnReader{&listReader{rctx, field, info, childRdr, props, 1}}
+ lr := &listReader{rctx: rctx, field: field, info: info, itemRdr:
childRdr, props: props}
+ lr.refCount.Add(1)
+ return &ColumnReader{
+ lr,
+ }
}
func (lr *listReader) Retain() {
- atomic.AddInt64(&lr.refCount, 1)
+ lr.refCount.Add(1)
}
func (lr *listReader) Release() {
- if atomic.AddInt64(&lr.refCount, -1) == 0 {
+ if lr.refCount.Add(-1) == 0 {
if lr.itemRdr != nil {
lr.itemRdr.Release()
lr.itemRdr = nil
@@ -473,7 +478,14 @@ type fixedSizeListReader struct {
func newFixedSizeListReader(rctx *readerCtx, field *arrow.Field, info
file.LevelInfo, childRdr *ColumnReader, props ArrowReadProperties)
*ColumnReader {
childRdr.Retain()
- return &ColumnReader{&fixedSizeListReader{listReader{rctx, field, info,
childRdr, props, 1}}}
+ lr := &listReader{rctx: rctx, field: field, info: info, itemRdr:
childRdr, props: props}
+ lr.refCount.Add(1)
+
+ return &ColumnReader{
+ &fixedSizeListReader{
+ *lr,
+ },
+ }
}
// helper function to combine chunks into a single array.
@@ -613,9 +625,7 @@ func transferBinary(rdr file.RecordReader, dt
arrow.DataType) *arrow.Chunked {
}
func transferInt(rdr file.RecordReader, dt arrow.DataType) arrow.ArrayData {
- var (
- output reflect.Value
- )
+ var output reflect.Value
signed := true
// create buffer for proper type since parquet only has int32 and int64
@@ -795,9 +805,7 @@ func transferDecimalInteger(rdr file.RecordReader, dt
arrow.DataType) arrow.Arra
}
func uint64FromBigEndianShifted(buf []byte) uint64 {
- var (
- bytes [8]byte
- )
+ var bytes [8]byte
copy(bytes[8-len(buf):], buf)
return binary.BigEndian.Uint64(bytes[:])
}
diff --git a/parquet/pqarrow/file_reader.go b/parquet/pqarrow/file_reader.go
index ab7dcec..df73613 100644
--- a/parquet/pqarrow/file_reader.go
+++ b/parquet/pqarrow/file_reader.go
@@ -389,7 +389,6 @@ func (fr *FileReader) ReadRowGroups(ctx context.Context,
indices, rowGroups []in
// if the context is in error, but we haven't set an error yet, then it
means that the parent context
// was cancelled. In this case, we should exit early as some columns
may not have been read yet.
err = errors.Join(err, ctx.Err())
-
if err != nil {
// if we encountered an error, consume any waiting data on the
channel
// so the goroutines don't leak and so memory can get cleaned
up. we already
@@ -491,14 +490,15 @@ func (fr *FileReader) GetRecordReader(ctx
context.Context, colIndices, rowGroups
if fr.Props.BatchSize <= 0 {
batchSize = nrows
}
- return &recordReader{
+ rr := &recordReader{
numRows: nrows,
batchSize: batchSize,
parallel: fr.Props.Parallel,
sc: sc,
fieldReaders: readers,
- refCount: 1,
- }, nil
+ }
+ rr.refCount.Add(1)
+ return rr, nil
}
func (fr *FileReader) getReader(ctx context.Context, field *SchemaField,
arrowField arrow.Field) (out *ColumnReader, err error) {
@@ -558,8 +558,10 @@ func (fr *FileReader) getReader(ctx context.Context, field
*SchemaField, arrowFi
if len(childFields) == 0 {
return nil, nil
}
- filtered := arrow.Field{Name: arrowField.Name, Nullable:
arrowField.Nullable,
- Metadata: arrowField.Metadata, Type:
arrow.StructOf(childFields...)}
+ filtered := arrow.Field{
+ Name: arrowField.Name, Nullable: arrowField.Nullable,
+ Metadata: arrowField.Metadata, Type:
arrow.StructOf(childFields...),
+ }
out = newStructReader(&rctx, &filtered, field.LevelInfo,
childReaders, fr.Props)
case arrow.LIST, arrow.FIXED_SIZE_LIST, arrow.MAP:
child := field.Children[0]
@@ -682,7 +684,7 @@ type recordReader struct {
cur arrow.Record
err error
- refCount int64
+ refCount atomic.Int64
}
func (r *recordReader) SeekToRow(row int64) error {
@@ -705,11 +707,11 @@ func (r *recordReader) SeekToRow(row int64) error {
}
func (r *recordReader) Retain() {
- atomic.AddInt64(&r.refCount, 1)
+ r.refCount.Add(1)
}
func (r *recordReader) Release() {
- if atomic.AddInt64(&r.refCount, -1) == 0 {
+ if r.refCount.Add(-1) == 0 {
if r.cur != nil {
r.cur.Release()
r.cur = nil