This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new b975977653 GH-36669: [Go] Guard against garbage in C Data structures
(#36670)
b975977653 is described below
commit b9759776535583549a9f3ef4cb5b20f44f42f8aa
Author: David Li <[email protected]>
AuthorDate: Thu Jul 13 14:09:41 2023 -0400
GH-36669: [Go] Guard against garbage in C Data structures (#36670)
### Rationale for this change
Prevent hard to debug crashes when using Go code with other code via C Data
Interface.
### What changes are included in this PR?
In the C Stream Interface implementation, jump through a trampoline that
zeroes the out parameters before letting Go see them.
Note that this can only guard against the issue when the C Stream Interface
is used.
Also, fix other issues in the C Data Interface tests with invalid pointers
and uninitialized memory that were turned up by the new test here (because it
calls `runtime.GC` very frequently).
### Are these changes tested?
Yes
### Are there any user-facing changes?
No
**This PR contains a "Critical Fix".**
* Closes: #36669
Lead-authored-by: David Li <[email protected]>
Co-authored-by: Matt Topol <[email protected]>
Signed-off-by: David Li <[email protected]>
---
go/arrow/cdata/cdata_exports.go | 10 +++++-----
go/arrow/cdata/cdata_fulltest.c | 33 +++++++++++++++++++++++++++++++++
go/arrow/cdata/cdata_test.go | 26 ++++++++++++++++++++++++++
go/arrow/cdata/cdata_test_framework.go | 34 +++++++++++++++++++++++++++-------
go/arrow/cdata/exports.go | 18 +++++++++++-------
go/arrow/cdata/interface.go | 20 ++++++++++++++++++++
go/arrow/cdata/trampoline.c | 34 ++++++++++++++++++++++++++++++++++
7 files changed, 156 insertions(+), 19 deletions(-)
diff --git a/go/arrow/cdata/cdata_exports.go b/go/arrow/cdata/cdata_exports.go
index 7b2f10ea66..dae9f5fefe 100644
--- a/go/arrow/cdata/cdata_exports.go
+++ b/go/arrow/cdata/cdata_exports.go
@@ -283,7 +283,7 @@ func (exp *schemaExporter) export(field arrow.Field) {
func allocateArrowSchemaArr(n int) (out []CArrowSchema) {
s := (*reflect.SliceHeader)(unsafe.Pointer(&out))
- s.Data = uintptr(C.malloc(C.sizeof_struct_ArrowSchema * C.size_t(n)))
+ s.Data = uintptr(C.calloc(C.size_t(n), C.sizeof_struct_ArrowSchema))
s.Len = n
s.Cap = n
@@ -292,7 +292,7 @@ func allocateArrowSchemaArr(n int) (out []CArrowSchema) {
func allocateArrowSchemaPtrArr(n int) (out []*CArrowSchema) {
s := (*reflect.SliceHeader)(unsafe.Pointer(&out))
- s.Data = uintptr(C.malloc(C.size_t(unsafe.Sizeof((*CArrowSchema)(nil)))
* C.size_t(n)))
+ s.Data = uintptr(C.calloc(C.size_t(n),
C.size_t(unsafe.Sizeof((*CArrowSchema)(nil)))))
s.Len = n
s.Cap = n
@@ -301,7 +301,7 @@ func allocateArrowSchemaPtrArr(n int) (out []*CArrowSchema)
{
func allocateArrowArrayArr(n int) (out []CArrowArray) {
s := (*reflect.SliceHeader)(unsafe.Pointer(&out))
- s.Data = uintptr(C.malloc(C.sizeof_struct_ArrowArray * C.size_t(n)))
+ s.Data = uintptr(C.calloc(C.size_t(n), C.sizeof_struct_ArrowArray))
s.Len = n
s.Cap = n
@@ -310,7 +310,7 @@ func allocateArrowArrayArr(n int) (out []CArrowArray) {
func allocateArrowArrayPtrArr(n int) (out []*CArrowArray) {
s := (*reflect.SliceHeader)(unsafe.Pointer(&out))
- s.Data = uintptr(C.malloc(C.size_t(unsafe.Sizeof((*CArrowArray)(nil)))
* C.size_t(n)))
+ s.Data = uintptr(C.calloc(C.size_t(n),
C.size_t(unsafe.Sizeof((*CArrowArray)(nil)))))
s.Len = n
s.Cap = n
@@ -319,7 +319,7 @@ func allocateArrowArrayPtrArr(n int) (out []*CArrowArray) {
func allocateBufferPtrArr(n int) (out []*C.void) {
s := (*reflect.SliceHeader)(unsafe.Pointer(&out))
- s.Data = uintptr(C.malloc(C.size_t(unsafe.Sizeof((*C.void)(nil))) *
C.size_t(n)))
+ s.Data = uintptr(C.calloc(C.size_t(n),
C.size_t(unsafe.Sizeof((*C.void)(nil)))))
s.Len = n
s.Cap = n
diff --git a/go/arrow/cdata/cdata_fulltest.c b/go/arrow/cdata/cdata_fulltest.c
index b85e1e8310..7aed597942 100644
--- a/go/arrow/cdata/cdata_fulltest.c
+++ b/go/arrow/cdata/cdata_fulltest.c
@@ -404,6 +404,7 @@ void setup_array_stream_test(const int n_batches, struct
ArrowArrayStream* out)
int test_exported_stream(struct ArrowArrayStream* stream) {
while (1) {
struct ArrowArray array;
+ memset(&array, 0, sizeof(array));
// Garbage - implementation should not try to call it, though!
array.release = (void*)0xDEADBEEF;
int rc = stream->get_next(stream, &array);
@@ -447,3 +448,35 @@ void test_stream_schema_fallible(struct ArrowArrayStream*
stream) {
stream->private_data = &kFallibleStream;
stream->release = FallibleRelease;
}
+
+int confuse_go_gc(struct ArrowArrayStream* stream, unsigned int seed) {
+ struct ArrowSchema schema;
+ // Try to confuse the Go GC by putting what looks like a Go pointer here.
+#ifdef _WIN32
+ // Thread-safe on Windows with the multithread CRT
+#define DORAND rand()
+#else
+#define DORAND rand_r(&seed)
+#endif
+ schema.name = (char*)(0xc000000000L + (DORAND % 0x2000));
+ schema.format = (char*)(0xc000000000L + (DORAND % 0x2000));
+ int rc = stream->get_schema(stream, &schema);
+ if (rc != 0) return rc;
+ schema.release(&schema);
+
+ while (1) {
+ struct ArrowArray array;
+ array.release = (void*)(0xc000000000L + (DORAND % 0x2000));
+ array.private_data = (void*)(0xc000000000L + (DORAND % 0x2000));
+ int rc = stream->get_next(stream, &array);
+ if (rc != 0) return rc;
+
+ if (array.release == NULL) {
+ stream->release(stream);
+ break;
+ }
+ array.release(&array);
+ }
+ return 0;
+#undef DORAND
+}
diff --git a/go/arrow/cdata/cdata_test.go b/go/arrow/cdata/cdata_test.go
index f336dec370..f4c09000cb 100644
--- a/go/arrow/cdata/cdata_test.go
+++ b/go/arrow/cdata/cdata_test.go
@@ -29,6 +29,7 @@ import (
"io"
"runtime"
"runtime/cgo"
+ "sync"
"testing"
"time"
"unsafe"
@@ -940,3 +941,28 @@ func TestRecordReaderImportError(t *testing.T) {
}
assert.Contains(t, err.Error(), "Expected error message")
}
+
+func TestConfuseGoGc(t *testing.T) {
+ // Regression test for https://github.com/apache/arrow-adbc/issues/729
+ reclist := arrdata.Records["primitives"]
+
+ var wg sync.WaitGroup
+ concurrency := 32
+ wg.Add(concurrency)
+
+ // XXX: this test is a bit expensive
+ for i := 0; i < concurrency; i++ {
+ go func() {
+ for i := 0; i < 256; i++ {
+ rdr, err :=
array.NewRecordReader(reclist[0].Schema(), reclist)
+ assert.NoError(t, err)
+ runtime.GC()
+ assert.NoError(t, confuseGoGc(rdr))
+ runtime.GC()
+ }
+ wg.Done()
+ }()
+ }
+
+ wg.Wait()
+}
diff --git a/go/arrow/cdata/cdata_test_framework.go
b/go/arrow/cdata/cdata_test_framework.go
index fb61229641..c731c730c6 100644
--- a/go/arrow/cdata/cdata_test_framework.go
+++ b/go/arrow/cdata/cdata_test_framework.go
@@ -21,11 +21,16 @@ package cdata
// #include <stdlib.h>
// #include <stdint.h>
+// #include <string.h>
// #include "arrow/c/abi.h"
// #include "arrow/c/helpers.h"
//
// void setup_array_stream_test(const int n_batches, struct ArrowArrayStream*
out);
-// struct ArrowArray* get_test_arr() { return (struct
ArrowArray*)(malloc(sizeof(struct ArrowArray))); }
+// struct ArrowArray* get_test_arr() {
+// struct ArrowArray* array = (struct ArrowArray*)malloc(sizeof(struct
ArrowArray));
+// memset(array, 0, sizeof(*array));
+// return array;
+// }
// struct ArrowArrayStream* get_test_stream() {
// struct ArrowArrayStream* out = (struct
ArrowArrayStream*)malloc(sizeof(struct ArrowArrayStream));
// memset(out, 0, sizeof(struct ArrowArrayStream));
@@ -56,11 +61,13 @@ package cdata
// struct ArrowSchema** test_union(const char** fmts, const char** names,
int64_t* flags, const int n);
// int test_exported_stream(struct ArrowArrayStream* stream);
// void test_stream_schema_fallible(struct ArrowArrayStream* stream);
+// int confuse_go_gc(struct ArrowArrayStream* stream, unsigned int seed);
import "C"
import (
"errors"
"fmt"
"io"
+ "math/rand"
"unsafe"
"github.com/apache/arrow/go/v13/arrow"
@@ -271,15 +278,17 @@ func createCArr(arr arrow.Array) *CArrowArray {
carr.null_count = C.int64_t(arr.NullN())
carr.offset = C.int64_t(arr.Data().Offset())
buffers := arr.Data().Buffers()
- cbuf := []unsafe.Pointer{}
- for _, b := range buffers {
+ cbufs := allocateBufferPtrArr(len(buffers))
+ for i, b := range buffers {
if b != nil {
- cbuf = append(cbuf, C.CBytes(b.Bytes()))
+ cbufs[i] = (*C.void)(C.CBytes(b.Bytes()))
+ } else {
+ cbufs[i] = nil
}
}
- carr.n_buffers = C.int64_t(len(cbuf))
- if len(cbuf) > 0 {
- carr.buffers = &cbuf[0]
+ carr.n_buffers = C.int64_t(len(cbufs))
+ if len(cbufs) > 0 {
+ carr.buffers = (*unsafe.Pointer)(unsafe.Pointer(&cbufs[0]))
}
carr.release = (*[0]byte)(C.release_test_arr)
@@ -350,3 +359,14 @@ func fallibleSchemaTest() error {
}
return nil
}
+
+func confuseGoGc(reader array.RecordReader) error {
+ out := C.get_test_stream()
+ ExportRecordReader(reader, out)
+ rc := C.confuse_go_gc(out, C.uint(rand.Int()))
+ C.free(unsafe.Pointer(out))
+ if rc == 0 {
+ return nil
+ }
+ return fmt.Errorf("Exported stream test failed with return code %d",
int(rc))
+}
diff --git a/go/arrow/cdata/exports.go b/go/arrow/cdata/exports.go
index 2bbd45e58a..118dec2c38 100644
--- a/go/arrow/cdata/exports.go
+++ b/go/arrow/cdata/exports.go
@@ -28,11 +28,14 @@ import (
// #include <stdlib.h>
// #include "arrow/c/helpers.h"
//
-// typedef const char cchar_t;
-// extern int streamGetSchema(struct ArrowArrayStream*, struct
ArrowSchema*);
-// extern int streamGetNext(struct ArrowArrayStream*, struct ArrowArray*);
-// extern const char* streamGetError(struct ArrowArrayStream*);
-// extern void streamRelease(struct ArrowArrayStream*);
+// typedef const char cchar_t;
+// extern int streamGetSchema(struct ArrowArrayStream*, struct ArrowSchema*);
+// extern int streamGetNext(struct ArrowArrayStream*, struct ArrowArray*);
+// extern const char* streamGetError(struct ArrowArrayStream*);
+// extern void streamRelease(struct ArrowArrayStream*);
+// // XXX(https://github.com/apache/arrow-adbc/issues/729)
+// int streamGetSchemaTrampoline(struct ArrowArrayStream* stream, struct
ArrowSchema* out);
+// int streamGetNextTrampoline(struct ArrowArrayStream* stream, struct
ArrowArray* out);
//
import "C"
@@ -154,10 +157,11 @@ func streamRelease(handle *CArrowArrayStream) {
}
func exportStream(rdr array.RecordReader, out *CArrowArrayStream) {
- out.get_schema = (*[0]byte)(C.streamGetSchema)
- out.get_next = (*[0]byte)(C.streamGetNext)
+ out.get_schema = (*[0]byte)(C.streamGetSchemaTrampoline)
+ out.get_next = (*[0]byte)(C.streamGetNextTrampoline)
out.get_last_error = (*[0]byte)(C.streamGetError)
out.release = (*[0]byte)(C.streamRelease)
+ rdr.Retain()
h := cgo.NewHandle(cRecordReader{rdr: rdr, err: nil})
out.private_data = createHandle(h)
}
diff --git a/go/arrow/cdata/interface.go b/go/arrow/cdata/interface.go
index 64b8176ad2..5040487800 100644
--- a/go/arrow/cdata/interface.go
+++ b/go/arrow/cdata/interface.go
@@ -198,6 +198,11 @@ func ImportCRecordReader(stream *CArrowArrayStream, schema
*arrow.Schema) (arrio
// the populating of the struct. Any memory allocated will be allocated using
malloc
// which means that it is invisible to the Go Garbage Collector and must be
freed manually
// using the callback on the CArrowSchema object.
+//
+// WARNING: the output ArrowSchema MUST BE ZERO INITIALIZED, or the Go garbage
collector
+// may error at runtime, due to CGO rules ("the current implementation may
sometimes
+// cause a runtime error if the contents of the C memory appear to be a Go
pointer").
+// You have been warned!
func ExportArrowSchema(schema *arrow.Schema, out *CArrowSchema) {
dummy := arrow.Field{Type: arrow.StructOf(schema.Fields()...),
Metadata: schema.Metadata()}
exportField(dummy, out)
@@ -220,6 +225,11 @@ func ExportArrowSchema(schema *arrow.Schema, out
*CArrowSchema) {
// The release function on the populated CArrowArray will properly decrease
the reference counts,
// and release the memory if the record has already been released. But since
this must be explicitly
// done, make sure it is released so that you do not create a memory leak.
+//
+// WARNING: the output ArrowArray MUST BE ZERO INITIALIZED, or the Go garbage
collector
+// may error at runtime, due to CGO rules ("the current implementation may
sometimes
+// cause a runtime error if the contents of the C memory appear to be a Go
pointer").
+// You have been warned!
func ExportArrowRecordBatch(rb arrow.Record, out *CArrowArray, outSchema
*CArrowSchema) {
children := make([]arrow.ArrayData, rb.NumCols())
for i := range rb.Columns() {
@@ -243,6 +253,11 @@ func ExportArrowRecordBatch(rb arrow.Record, out
*CArrowArray, outSchema *CArrow
// being used by the arrow.Array passed in, in order to share with zero-copy
across the C
// Data Interface. See the documentation for ExportArrowRecordBatch for
details on how to ensure
// you do not leak memory and prevent unwanted, undefined or strange behaviors.
+//
+// WARNING: the output ArrowArray MUST BE ZERO INITIALIZED, or the Go garbage
collector
+// may error at runtime, due to CGO rules ("the current implementation may
sometimes
+// cause a runtime error if the contents of the C memory appear to be a Go
pointer").
+// You have been warned!
func ExportArrowArray(arr arrow.Array, out *CArrowArray, outSchema
*CArrowSchema) {
exportArray(arr, out, outSchema)
}
@@ -252,6 +267,11 @@ func ExportArrowArray(arr arrow.Array, out *CArrowArray,
outSchema *CArrowSchema
// CArrowArrayStream takes ownership of the RecordReader until the consumer
calls the release
// callback, as such it is unnecesary to call Release on the passed in reader
unless it has
// previously been retained.
+//
+// WARNING: the output ArrowArrayStream MUST BE ZERO INITIALIZED, or the Go
garbage
+// collector may error at runtime, due to CGO rules ("the current
implementation may
+// sometimes cause a runtime error if the contents of the C memory appear to
be a Go
+// pointer"). You have been warned!
func ExportRecordReader(reader array.RecordReader, out *CArrowArrayStream) {
exportStream(reader, out)
}
diff --git a/go/arrow/cdata/trampoline.c b/go/arrow/cdata/trampoline.c
new file mode 100644
index 0000000000..01db13fab4
--- /dev/null
+++ b/go/arrow/cdata/trampoline.c
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <string.h>
+
+#include "arrow/c/abi.h"
+
+int streamGetSchema(struct ArrowArrayStream*, struct ArrowSchema*);
+int streamGetNext(struct ArrowArrayStream*, struct ArrowArray*);
+
+int streamGetSchemaTrampoline(struct ArrowArrayStream* stream, struct
ArrowSchema* out) {
+ // XXX(https://github.com/apache/arrow-adbc/issues/729)
+ memset(out, 0, sizeof(*out));
+ return streamGetSchema(stream, out);
+}
+
+int streamGetNextTrampoline(struct ArrowArrayStream* stream, struct
ArrowArray* out) {
+ // XXX(https://github.com/apache/arrow-adbc/issues/729)
+ memset(out, 0, sizeof(*out));
+ return streamGetNext(stream, out);
+}