This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push:
new 24c66fa8 feat(arrow/ipc): support custom_metadata on RecordBatch
messages (#669)
24c66fa8 is described below
commit 24c66fa8bb79bd311be821627da0723a6428c84a
Author: Rusty Conover <[email protected]>
AuthorDate: Mon Feb 23 10:57:30 2026 -0500
feat(arrow/ipc): support custom_metadata on RecordBatch messages (#669)
### Rationale for this change
Arrow IPC Messages wrap RecordBatch data in a Message flatbuffer which
has a custom_metadata field (vector of KeyValue pairs). PyArrow and
other implementations use this to attach per-batch metadata, but the Go
implementation previously ignored it on both read and write paths.
### What changes are included in this PR?
Add a RecordBatchWithMetadata optional interface to avoid breaking the
existing RecordBatch interface. The simpleRecord implementation carries
metadata through NewSlice, SetColumn, and IPC round-trips. Includes
PyArrow-generated test fixtures for interoperability validation.
### Are these changes tested?
Yes tests are included.
### Are there any user-facing changes?
Yes custom_metadata is now supported on RecordBatches.
This change was developed with AI assistance and manually reviewed.
---
arrow/array/record.go | 21 +-
arrow/ipc/file_reader.go | 7 +-
arrow/ipc/ipc_test.go | 257 +++++++++++++++++++++++
arrow/ipc/metadata.go | 12 +-
arrow/ipc/testdata/custom_metadata.arrow | Bin 0 -> 1266 bytes
arrow/ipc/testdata/custom_metadata_stream.arrows | Bin 0 -> 944 bytes
arrow/ipc/writer.go | 11 +-
arrow/record.go | 8 +
8 files changed, 303 insertions(+), 13 deletions(-)
diff --git a/arrow/array/record.go b/arrow/array/record.go
index cca79661..0aaf771b 100644
--- a/arrow/array/record.go
+++ b/arrow/array/record.go
@@ -121,6 +121,7 @@ type simpleRecord struct {
refCount atomic.Int64
schema *arrow.Schema
+ meta arrow.Metadata
rows int64
arrs []arrow.Array
@@ -131,8 +132,19 @@ type simpleRecord struct {
// NewRecordBatch panics if the columns and schema are inconsistent.
// NewRecordBatch panics if rows is larger than the height of the columns.
func NewRecordBatch(schema *arrow.Schema, cols []arrow.Array, nrows int64)
arrow.RecordBatch {
+ return NewRecordBatchWithMetadata(schema, cols, nrows, arrow.Metadata{})
+}
+
+// NewRecordBatchWithMetadata returns a basic, non-lazy in-memory record batch
+// with custom metadata. The metadata is preserved during IPC serialization
+// at the Message level.
+//
+// NewRecordBatchWithMetadata panics if the columns and schema are
inconsistent.
+// NewRecordBatchWithMetadata panics if rows is larger than the height of the
columns.
+func NewRecordBatchWithMetadata(schema *arrow.Schema, cols []arrow.Array,
nrows int64, meta arrow.Metadata) arrow.RecordBatch {
rec := &simpleRecord{
schema: schema,
+ meta: meta,
rows: nrows,
arrs: make([]arrow.Array, len(cols)),
}
@@ -189,7 +201,7 @@ func (rec *simpleRecord) SetColumn(i int, arr arrow.Array)
(arrow.RecordBatch, e
copy(arrs, rec.arrs)
arrs[i] = arr
- return NewRecordBatch(rec.schema, arrs, rec.rows), nil
+ return NewRecordBatchWithMetadata(rec.schema, arrs, rec.rows,
rec.meta), nil
}
func (rec *simpleRecord) validate() error {
@@ -240,6 +252,7 @@ func (rec *simpleRecord) Release() {
}
func (rec *simpleRecord) Schema() *arrow.Schema { return rec.schema }
+func (rec *simpleRecord) Metadata() arrow.Metadata { return rec.meta }
func (rec *simpleRecord) NumRows() int64 { return rec.rows }
func (rec *simpleRecord) NumCols() int64 { return
int64(len(rec.arrs)) }
func (rec *simpleRecord) Columns() []arrow.Array { return rec.arrs }
@@ -262,7 +275,7 @@ func (rec *simpleRecord) NewSlice(i, j int64)
arrow.RecordBatch {
arr.Release()
}
}()
- return NewRecordBatch(rec.schema, arrs, j-i)
+ return NewRecordBatchWithMetadata(rec.schema, arrs, j-i, rec.meta)
}
func (rec *simpleRecord) String() string {
@@ -504,6 +517,6 @@ func IterFromReader(rdr RecordReader)
iter.Seq2[arrow.RecordBatch, error] {
}
var (
- _ arrow.RecordBatch = (*simpleRecord)(nil)
- _ RecordReader = (*simpleRecords)(nil)
+ _ arrow.RecordBatchWithMetadata = (*simpleRecord)(nil)
+ _ RecordReader = (*simpleRecords)(nil)
)
diff --git a/arrow/ipc/file_reader.go b/arrow/ipc/file_reader.go
index d742d852..605b768e 100644
--- a/arrow/ipc/file_reader.go
+++ b/arrow/ipc/file_reader.go
@@ -458,6 +458,11 @@ func newRecordBatch(schema *arrow.Schema, memo
*dictutils.Memo, meta *memory.Buf
defer codec.Close()
}
+ customMeta, err := metadataFromFB(msg)
+ if err != nil {
+ panic(err)
+ }
+
ctx := &arrayLoaderContext{
src: ipcSource{
meta: &md,
@@ -488,7 +493,7 @@ func newRecordBatch(schema *arrow.Schema, memo
*dictutils.Memo, meta *memory.Buf
defer cols[i].Release()
}
- return array.NewRecordBatch(schema, cols, rows)
+ return array.NewRecordBatchWithMetadata(schema, cols, rows, customMeta)
}
type ipcSource struct {
diff --git a/arrow/ipc/ipc_test.go b/arrow/ipc/ipc_test.go
index a23b3476..93bd987a 100644
--- a/arrow/ipc/ipc_test.go
+++ b/arrow/ipc/ipc_test.go
@@ -22,6 +22,7 @@ import (
"fmt"
"io"
"math/rand"
+ "os"
"strconv"
"strings"
"testing"
@@ -688,3 +689,259 @@ func TestArrowBinaryIPCWriterTruncatedVOffsets(t
*testing.T) {
require.False(t, reader.Next())
}
+
+func TestRecordBatchCustomMetadataRoundtrip(t *testing.T) {
+ mem := memory.NewGoAllocator()
+ schema := arrow.NewSchema(
+ []arrow.Field{{Name: "x", Type: arrow.PrimitiveTypes.Int32}},
+ nil,
+ )
+
+ bldr := array.NewInt32Builder(mem)
+ defer bldr.Release()
+ bldr.AppendValues([]int32{1, 2, 3}, nil)
+ col := bldr.NewArray()
+ defer col.Release()
+
+ meta := arrow.NewMetadata([]string{"k1", "k2"}, []string{"v1", "v2"})
+ rec := array.NewRecordBatchWithMetadata(schema, []arrow.Array{col}, 3,
meta)
+ defer rec.Release()
+
+ // Write to IPC stream
+ var buf bytes.Buffer
+ writer := ipc.NewWriter(&buf, ipc.WithSchema(schema))
+ require.NoError(t, writer.Write(rec))
+ require.NoError(t, writer.Close())
+
+ // Read back
+ reader, err := ipc.NewReader(bytes.NewReader(buf.Bytes()))
+ require.NoError(t, err)
+ defer reader.Release()
+
+ require.True(t, reader.Next())
+ got := reader.RecordBatch()
+
+ rm, ok := got.(arrow.RecordBatchWithMetadata)
+ require.True(t, ok, "record batch should implement
RecordBatchWithMetadata")
+
+ require.Equal(t, meta.Keys(), rm.Metadata().Keys())
+ require.Equal(t, meta.Values(), rm.Metadata().Values())
+}
+
+func TestRecordBatchCustomMetadataFileRoundtrip(t *testing.T) {
+ mem := memory.NewGoAllocator()
+ schema := arrow.NewSchema(
+ []arrow.Field{{Name: "x", Type: arrow.PrimitiveTypes.Int32}},
+ nil,
+ )
+
+ bldr := array.NewInt32Builder(mem)
+ defer bldr.Release()
+ bldr.AppendValues([]int32{10, 20}, nil)
+ col := bldr.NewArray()
+ defer col.Release()
+
+ meta := arrow.NewMetadata([]string{"file_key"}, []string{"file_value"})
+ rec := array.NewRecordBatchWithMetadata(schema, []arrow.Array{col}, 2,
meta)
+ defer rec.Release()
+
+ // Write to IPC file
+ var buf bytes.Buffer
+ writer, err := ipc.NewFileWriter(&buf, ipc.WithSchema(schema))
+ require.NoError(t, err)
+ require.NoError(t, writer.Write(rec))
+ require.NoError(t, writer.Close())
+
+ // Read back
+ reader, err := ipc.NewFileReader(bytes.NewReader(buf.Bytes()))
+ require.NoError(t, err)
+ defer reader.Close()
+
+ require.Equal(t, 1, reader.NumRecords())
+ got, err := reader.RecordBatchAt(0)
+ require.NoError(t, err)
+ defer got.Release()
+
+ rm, ok := got.(arrow.RecordBatchWithMetadata)
+ require.True(t, ok, "record batch should implement
RecordBatchWithMetadata")
+
+ require.Equal(t, meta.Keys(), rm.Metadata().Keys())
+ require.Equal(t, meta.Values(), rm.Metadata().Values())
+}
+
+func TestRecordBatchCustomMetadataInterop(t *testing.T) {
+ t.Run("file", func(t *testing.T) {
+ f, err := os.Open("testdata/custom_metadata.arrow")
+ require.NoError(t, err)
+ defer f.Close()
+
+ reader, err := ipc.NewFileReader(f)
+ require.NoError(t, err)
+ defer reader.Close()
+
+ // Verify schema metadata
+ schemaMeta := reader.Schema().Metadata()
+ idx := schemaMeta.FindKey("schema_key")
+ require.GreaterOrEqual(t, idx, 0)
+ require.Equal(t, "schema_value", schemaMeta.Values()[idx])
+
+ require.Equal(t, 2, reader.NumRecords())
+
+ // Batch 1
+ rec0, err := reader.RecordBatchAt(0)
+ require.NoError(t, err)
+ defer rec0.Release()
+ rm0, ok := rec0.(arrow.RecordBatchWithMetadata)
+ require.True(t, ok)
+ m0 := rm0.Metadata()
+ require.Equal(t, "1", m0.Values()[m0.FindKey("batch_num")])
+ require.Equal(t, "value1", m0.Values()[m0.FindKey("key1")])
+
+ // Batch 2
+ rec1, err := reader.RecordBatchAt(1)
+ require.NoError(t, err)
+ defer rec1.Release()
+ rm1, ok := rec1.(arrow.RecordBatchWithMetadata)
+ require.True(t, ok)
+ m1 := rm1.Metadata()
+ require.Equal(t, "2", m1.Values()[m1.FindKey("batch_num")])
+ require.Equal(t, "value2", m1.Values()[m1.FindKey("key2")])
+ })
+
+ t.Run("stream", func(t *testing.T) {
+ data, err :=
os.ReadFile("testdata/custom_metadata_stream.arrows")
+ require.NoError(t, err)
+
+ reader, err := ipc.NewReader(bytes.NewReader(data))
+ require.NoError(t, err)
+ defer reader.Release()
+
+ // Verify schema metadata
+ schemaMeta := reader.Schema().Metadata()
+ idx := schemaMeta.FindKey("schema_key")
+ require.GreaterOrEqual(t, idx, 0)
+ require.Equal(t, "schema_value", schemaMeta.Values()[idx])
+
+ // Batch 1
+ require.True(t, reader.Next())
+ rec0 := reader.RecordBatch()
+ rm0, ok := rec0.(arrow.RecordBatchWithMetadata)
+ require.True(t, ok)
+ m0 := rm0.Metadata()
+ require.Equal(t, "1", m0.Values()[m0.FindKey("batch_num")])
+ require.Equal(t, "value1", m0.Values()[m0.FindKey("key1")])
+
+ // Batch 2
+ require.True(t, reader.Next())
+ rec1 := reader.RecordBatch()
+ rm1, ok := rec1.(arrow.RecordBatchWithMetadata)
+ require.True(t, ok)
+ m1 := rm1.Metadata()
+ require.Equal(t, "2", m1.Values()[m1.FindKey("batch_num")])
+ require.Equal(t, "value2", m1.Values()[m1.FindKey("key2")])
+
+ require.False(t, reader.Next())
+ })
+}
+
+func TestRecordBatchCustomMetadataSlice(t *testing.T) {
+ mem := memory.NewGoAllocator()
+ schema := arrow.NewSchema(
+ []arrow.Field{{Name: "x", Type: arrow.PrimitiveTypes.Int32}},
+ nil,
+ )
+
+ bldr := array.NewInt32Builder(mem)
+ defer bldr.Release()
+ bldr.AppendValues([]int32{1, 2, 3, 4}, nil)
+ col := bldr.NewArray()
+ defer col.Release()
+
+ meta := arrow.NewMetadata([]string{"slice_key"},
[]string{"slice_value"})
+ rec := array.NewRecordBatchWithMetadata(schema, []arrow.Array{col}, 4,
meta)
+ defer rec.Release()
+
+ sliced := rec.NewSlice(1, 3)
+ defer sliced.Release()
+
+ rm, ok := sliced.(arrow.RecordBatchWithMetadata)
+ require.True(t, ok, "sliced record should implement
RecordBatchWithMetadata")
+ require.Equal(t, meta.Keys(), rm.Metadata().Keys())
+ require.Equal(t, meta.Values(), rm.Metadata().Values())
+ require.EqualValues(t, 2, sliced.NumRows())
+}
+
+func TestRecordBatchCustomMetadataSetColumn(t *testing.T) {
+ mem := memory.NewGoAllocator()
+ schema := arrow.NewSchema(
+ []arrow.Field{
+ {Name: "x", Type: arrow.PrimitiveTypes.Int32},
+ {Name: "y", Type: arrow.PrimitiveTypes.Int32},
+ },
+ nil,
+ )
+
+ bldr := array.NewInt32Builder(mem)
+ defer bldr.Release()
+ bldr.AppendValues([]int32{1, 2}, nil)
+ col1 := bldr.NewArray()
+ defer col1.Release()
+
+ bldr.AppendValues([]int32{3, 4}, nil)
+ col2 := bldr.NewArray()
+ defer col2.Release()
+
+ meta := arrow.NewMetadata([]string{"set_key"}, []string{"set_value"})
+ rec := array.NewRecordBatchWithMetadata(schema, []arrow.Array{col1,
col2}, 2, meta)
+ defer rec.Release()
+
+ bldr.AppendValues([]int32{5, 6}, nil)
+ newCol := bldr.NewArray()
+ defer newCol.Release()
+
+ updated, err := rec.SetColumn(0, newCol)
+ require.NoError(t, err)
+ defer updated.Release()
+
+ rm, ok := updated.(arrow.RecordBatchWithMetadata)
+ require.True(t, ok, "updated record should implement
RecordBatchWithMetadata")
+ require.Equal(t, meta.Keys(), rm.Metadata().Keys())
+ require.Equal(t, meta.Values(), rm.Metadata().Values())
+}
+
+func TestRecordBatchNoMetadataRoundtrip(t *testing.T) {
+ mem := memory.NewGoAllocator()
+ schema := arrow.NewSchema(
+ []arrow.Field{{Name: "x", Type: arrow.PrimitiveTypes.Int32}},
+ nil,
+ )
+
+ bldr := array.NewInt32Builder(mem)
+ defer bldr.Release()
+ bldr.AppendValues([]int32{1, 2, 3}, nil)
+ col := bldr.NewArray()
+ defer col.Release()
+
+ rec := array.NewRecordBatch(schema, []arrow.Array{col}, 3)
+ defer rec.Release()
+
+ // Write to IPC stream
+ var buf bytes.Buffer
+ writer := ipc.NewWriter(&buf, ipc.WithSchema(schema))
+ require.NoError(t, writer.Write(rec))
+ require.NoError(t, writer.Close())
+
+ // Read back
+ reader, err := ipc.NewReader(bytes.NewReader(buf.Bytes()))
+ require.NoError(t, err)
+ defer reader.Release()
+
+ require.True(t, reader.Next())
+ got := reader.RecordBatch()
+
+ rm, ok := got.(arrow.RecordBatchWithMetadata)
+ require.True(t, ok, "record batch should implement
RecordBatchWithMetadata")
+
+ // Metadata should be empty, not nil
+ require.Equal(t, 0, rm.Metadata().Len())
+}
diff --git a/arrow/ipc/metadata.go b/arrow/ipc/metadata.go
index b83c1a84..91f37af7 100644
--- a/arrow/ipc/metadata.go
+++ b/arrow/ipc/metadata.go
@@ -1141,13 +1141,15 @@ func writeFBBuilder(b *flatbuffers.Builder, mem
memory.Allocator) *memory.Buffer
return buf
}
-func writeMessageFB(b *flatbuffers.Builder, mem memory.Allocator, hdrType
flatbuf.MessageHeader, hdr flatbuffers.UOffsetT, bodyLen int64) *memory.Buffer {
+func writeMessageFB(b *flatbuffers.Builder, mem memory.Allocator, hdrType
flatbuf.MessageHeader, hdr flatbuffers.UOffsetT, bodyLen int64, customMetadata
arrow.Metadata) *memory.Buffer {
+ metaFB := metadataToFB(b, customMetadata,
flatbuf.MessageStartCustomMetadataVector)
flatbuf.MessageStart(b)
flatbuf.MessageAddVersion(b,
flatbuf.MetadataVersion(currentMetadataVersion))
flatbuf.MessageAddHeaderType(b, hdrType)
flatbuf.MessageAddHeader(b, hdr)
flatbuf.MessageAddBodyLength(b, bodyLen)
+ flatbuf.MessageAddCustomMetadata(b, metaFB)
msg := flatbuf.MessageEnd(b)
b.Finish(msg)
@@ -1157,7 +1159,7 @@ func writeMessageFB(b *flatbuffers.Builder, mem
memory.Allocator, hdrType flatbu
func writeSchemaMessage(schema *arrow.Schema, mem memory.Allocator, dict
*dictutils.Mapper) *memory.Buffer {
b := flatbuffers.NewBuilder(1024)
schemaFB := schemaToFB(b, schema, dict)
- return writeMessageFB(b, mem, flatbuf.MessageHeaderSchema, schemaFB, 0)
+ return writeMessageFB(b, mem, flatbuf.MessageHeaderSchema, schemaFB, 0,
arrow.Metadata{})
}
func writeFileFooter(schema *arrow.Schema, dicts, recs []dataBlock, w
io.Writer) error {
@@ -1184,10 +1186,10 @@ func writeFileFooter(schema *arrow.Schema, dicts, recs
[]dataBlock, w io.Writer)
return err
}
-func writeRecordMessage(mem memory.Allocator, size, bodyLength int64, fields
[]fieldMetadata, meta []bufferMetadata, codec flatbuf.CompressionType,
variadicCounts []int64) *memory.Buffer {
+func writeRecordMessage(mem memory.Allocator, size, bodyLength int64, fields
[]fieldMetadata, meta []bufferMetadata, codec flatbuf.CompressionType,
variadicCounts []int64, customMetadata arrow.Metadata) *memory.Buffer {
b := flatbuffers.NewBuilder(0)
recFB := recordToFB(b, size, bodyLength, fields, meta, codec,
variadicCounts)
- return writeMessageFB(b, mem, flatbuf.MessageHeaderRecordBatch, recFB,
bodyLength)
+ return writeMessageFB(b, mem, flatbuf.MessageHeaderRecordBatch, recFB,
bodyLength, customMetadata)
}
func writeDictionaryMessage(mem memory.Allocator, id int64, isDelta bool,
size, bodyLength int64, fields []fieldMetadata, meta []bufferMetadata, codec
flatbuf.CompressionType, variadicCounts []int64) *memory.Buffer {
@@ -1199,7 +1201,7 @@ func writeDictionaryMessage(mem memory.Allocator, id
int64, isDelta bool, size,
flatbuf.DictionaryBatchAddData(b, recFB)
flatbuf.DictionaryBatchAddIsDelta(b, isDelta)
dictFB := flatbuf.DictionaryBatchEnd(b)
- return writeMessageFB(b, mem, flatbuf.MessageHeaderDictionaryBatch,
dictFB, bodyLength)
+ return writeMessageFB(b, mem, flatbuf.MessageHeaderDictionaryBatch,
dictFB, bodyLength, arrow.Metadata{})
}
func recordToFB(b *flatbuffers.Builder, size, bodyLength int64, fields
[]fieldMetadata, meta []bufferMetadata, codec flatbuf.CompressionType,
variadicCounts []int64) flatbuffers.UOffsetT {
diff --git a/arrow/ipc/testdata/custom_metadata.arrow
b/arrow/ipc/testdata/custom_metadata.arrow
new file mode 100644
index 00000000..c553dbc2
Binary files /dev/null and b/arrow/ipc/testdata/custom_metadata.arrow differ
diff --git a/arrow/ipc/testdata/custom_metadata_stream.arrows
b/arrow/ipc/testdata/custom_metadata_stream.arrows
new file mode 100644
index 00000000..0aeef248
Binary files /dev/null and b/arrow/ipc/testdata/custom_metadata_stream.arrows
differ
diff --git a/arrow/ipc/writer.go b/arrow/ipc/writer.go
index 29646ae2..fa5db526 100644
--- a/arrow/ipc/writer.go
+++ b/arrow/ipc/writer.go
@@ -1035,11 +1035,16 @@ func (w *recordEncoder) Encode(p *Payload, rec
arrow.RecordBatch) error {
if err := w.encode(p, rec); err != nil {
return err
}
- return w.encodeMetadata(p, rec.NumRows())
+
+ var customMeta arrow.Metadata
+ if rm, ok := rec.(arrow.RecordBatchWithMetadata); ok {
+ customMeta = rm.Metadata()
+ }
+ return w.encodeMetadata(p, rec.NumRows(), customMeta)
}
-func (w *recordEncoder) encodeMetadata(p *Payload, nrows int64) error {
- p.meta = writeRecordMessage(w.mem, nrows, p.size, w.fields, w.meta,
w.codec, w.variadicCounts)
+func (w *recordEncoder) encodeMetadata(p *Payload, nrows int64, customMetadata
arrow.Metadata) error {
+ p.meta = writeRecordMessage(w.mem, nrows, p.size, w.fields, w.meta,
w.codec, w.variadicCounts, customMetadata)
return nil
}
diff --git a/arrow/record.go b/arrow/record.go
index 010dbe95..77ea1e18 100644
--- a/arrow/record.go
+++ b/arrow/record.go
@@ -48,6 +48,14 @@ type RecordBatch interface {
NewSlice(i, j int64) RecordBatch
}
+// RecordBatchWithMetadata is an optional interface for RecordBatch
+// implementations that support custom metadata. This metadata is
+// stored at the Message level in Arrow IPC format.
+type RecordBatchWithMetadata interface {
+ RecordBatch
+ Metadata() Metadata
+}
+
// Record as a term typically refers to a single row, but this type represents
a batch of rows, known in Arrow parlance
// as a RecordBatch. This alias is provided for backwards compatibility.
//