This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push:
new e3327330 fix(arrow/cdata): make nativeCRecordBatchReader deterministic
(#793)
e3327330 is described below
commit e33273301c2825b8ddf01b07946e8da4f85f50b6
Author: Matt Topol <[email protected]>
AuthorDate: Fri May 1 12:36:22 2026 -0400
fix(arrow/cdata): make nativeCRecordBatchReader deterministic (#793)
### Rationale for this change
Instead of relying on a finalizer, make the `nativeCRecordBatchReader`
use atomic Retain and Release to make releasing deterministic.
### What changes are included in this PR?
Remove the finalizer, call C.ArrowArrayRelease and
C.ArrowArrayStreamRelease based on refcount.
### Are these changes tested?
Yes, tests cover this already.
### Are there any user-facing changes?
Only that retain and release now properly control the determinism of
releasing the C memory instead of relying on a finalizer.
---
arrow/cdata/cdata.go | 49 +++++++++++++++++++++++++++++++++++-------------
arrow/cdata/interface.go | 2 ++
2 files changed, 38 insertions(+), 13 deletions(-)
diff --git a/arrow/cdata/cdata.go b/arrow/cdata/cdata.go
index 352dfd9a..68c6f7e3 100644
--- a/arrow/cdata/cdata.go
+++ b/arrow/cdata/cdata.go
@@ -47,12 +47,14 @@ import (
"runtime"
"strconv"
"strings"
+ "sync/atomic"
"syscall"
"unsafe"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/bitutil"
+ "github.com/apache/arrow-go/v18/arrow/internal/debug"
"github.com/apache/arrow-go/v18/arrow/memory"
)
@@ -903,18 +905,19 @@ func importCArrayAsType(arr *CArrowArray, dt
arrow.DataType) (imp *cimporter, er
}
func initReader(rdr *nativeCRecordBatchReader, stream *CArrowArrayStream)
error {
+ rdr.refCount.Store(1)
rdr.stream = C.get_stream()
C.ArrowArrayStreamMove(stream, rdr.stream)
rdr.arr = C.get_arr()
- runtime.SetFinalizer(rdr, func(r *nativeCRecordBatchReader) {
- if r.cur != nil {
- r.cur.Release()
- }
- C.ArrowArrayStreamRelease(r.stream)
- C.ArrowArrayRelease(r.arr)
- C.free(unsafe.Pointer(r.stream))
- C.free(unsafe.Pointer(r.arr))
- })
+
+ rdr.cleanUps[0] = runtime.AddCleanup(rdr, func(s *CArrowArrayStream) {
+ C.ArrowArrayStreamRelease(s)
+ C.free(unsafe.Pointer(s))
+ }, rdr.stream)
+ rdr.cleanUps[1] = runtime.AddCleanup(rdr, func(a *CArrowArray) {
+ C.ArrowArrayRelease(a)
+ C.free(unsafe.Pointer(a))
+ }, rdr.arr)
var sc CArrowSchema
errno := C.stream_get_schema(rdr.stream, &sc)
@@ -940,12 +943,32 @@ type nativeCRecordBatchReader struct {
cur arrow.RecordBatch
err error
+
+ refCount atomic.Int64
+ cleanUps [2]runtime.Cleanup
+}
+
+func (n *nativeCRecordBatchReader) Retain() {
+ n.refCount.Add(1)
}
-// No need to implement retain and release here as we used
runtime.SetFinalizer when constructing
-// the reader to free up the ArrowArrayStream memory when the garbage
collector cleans it up.
-func (n *nativeCRecordBatchReader) Retain() {}
-func (n *nativeCRecordBatchReader) Release() {}
+func (n *nativeCRecordBatchReader) Release() {
+ rc := n.refCount.Add(-1)
+ debug.Assert(rc >= 0, "too many releases")
+
+ if rc == 0 {
+ n.cleanUps[0].Stop()
+ n.cleanUps[1].Stop()
+ if n.cur != nil {
+ n.cur.Release()
+ }
+
+ C.ArrowArrayStreamRelease(n.stream)
+ C.ArrowArrayRelease(n.arr)
+ C.free(unsafe.Pointer(n.stream))
+ C.free(unsafe.Pointer(n.arr))
+ }
+}
func (n *nativeCRecordBatchReader) Err() error { return
n.err }
func (n *nativeCRecordBatchReader) RecordBatch() arrow.RecordBatch { return
n.cur }
diff --git a/arrow/cdata/interface.go b/arrow/cdata/interface.go
index f776d7f7..a3690662 100644
--- a/arrow/cdata/interface.go
+++ b/arrow/cdata/interface.go
@@ -189,8 +189,10 @@ func ImportCArrayStream(stream *CArrowArrayStream, schema
*arrow.Schema) arrio.R
func ImportCRecordReader(stream *CArrowArrayStream, schema *arrow.Schema)
(arrio.Reader, error) {
out := &nativeCRecordBatchReader{schema: schema}
if err := initReader(out, stream); err != nil {
+ out.Release()
return nil, err
}
+
return out, nil
}