This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push:
new a97bd0bf fix(parquet/pqarrow): normalize the element name in the
stored ARROW:schema (#746)
a97bd0bf is described below
commit a97bd0bf6fb1312121aa6969bd2efbbe9a54fffa
Author: Matt Topol <[email protected]>
AuthorDate: Fri Apr 17 13:30:37 2026 -0400
fix(parquet/pqarrow): normalize the element name in the stored ARROW:schema
(#746)
### Rationale for this change
closes #744
### What changes are included in this PR?
Normalizes the element name of list fields in the stored ARROW:schema of
a parquet file to avoid mismatches.
### Are these changes tested?
Yes, a new unit test is added.
### Are there any user-facing changes?
Yes, should be a bug fix to ensure consistency.
---
parquet/pqarrow/encode_arrow_test.go | 104 ++++++++++++++++++++++++++++++++++-
parquet/pqarrow/file_writer.go | 80 ++++++++++++++++++++++++++-
parquet/pqarrow/schema.go | 4 +-
parquet/schema/node.go | 2 +-
4 files changed, 184 insertions(+), 6 deletions(-)
diff --git a/parquet/pqarrow/encode_arrow_test.go
b/parquet/pqarrow/encode_arrow_test.go
index b60a960c..10ff5074 100644
--- a/parquet/pqarrow/encode_arrow_test.go
+++ b/parquet/pqarrow/encode_arrow_test.go
@@ -19,6 +19,7 @@ package pqarrow_test
import (
"bytes"
"context"
+ "encoding/base64"
"encoding/binary"
"fmt"
"math"
@@ -1532,9 +1533,9 @@ func makeListArray(values arrow.Array, size, nullcount
int) arrow.Array {
nullBitmap := make([]byte, int(bitutil.BytesForBits(int64(size))))
curOffset := 0
- for i := 0; i < size; i++ {
+ for i := range size {
offsetsArr[i] = int32(curOffset)
- if !(((i % 2) == 0) && ((i / 2) < nullcount)) {
+ if i%2 != 0 || i/2 >= nullcount {
// non-null list (list with index 1 is always empty)
bitutil.SetBit(nullBitmap, i)
if i != 1 {
@@ -2108,6 +2109,105 @@ func (ps *ParquetIOTestSuite)
TestStructWithListOfNestedStructs() {
ps.roundTripTable(mem, expected, false)
}
+// TestListOfStructWithEmptyListStoreSchema tests that ARROW:schema metadata
stored
+// in a Parquet file uses "element" (not "item") as the list element field
name, to
+// match the actual Parquet column paths. This is required for compatibility
with
+// readers like Snowflake that resolve columns by matching ARROW:schema field
names
+// to Parquet column path segments. See
https://github.com/apache/arrow-go/issues/744.
+func TestListOfStructWithEmptyListStoreSchema(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(t, 0)
+
+ opsStruct := arrow.StructOf(
+ arrow.Field{Name: "id", Type: arrow.BinaryTypes.String,
Nullable: false},
+ arrow.Field{Name: "token", Type: arrow.BinaryTypes.String,
Nullable: true},
+ arrow.Field{Name: "amount", Type: arrow.BinaryTypes.String,
Nullable: true},
+ )
+ // arrow.ListOf uses "item" as the element field name, which would
mismatch
+ // the Parquet column path that uses "element". The fix ensures the
stored
+ // ARROW:schema uses "element" to stay consistent with the Parquet
columns.
+ schema := arrow.NewSchema([]arrow.Field{
+ {Name: "block_num", Type: arrow.PrimitiveTypes.Uint64,
Nullable: false},
+ {Name: "tx_id", Type: arrow.BinaryTypes.String, Nullable:
false},
+ {Name: "ops", Type: arrow.ListOf(opsStruct), Nullable: true},
+ }, nil)
+
+ b := array.NewRecordBuilder(mem, schema)
+ defer b.Release()
+
+ b.Field(0).(*array.Uint64Builder).AppendValues([]uint64{100, 101, 102},
nil)
+ b.Field(1).(*array.StringBuilder).AppendValues([]string{"tx-a", "tx-b",
"tx-c"}, nil)
+
+ lb := b.Field(2).(*array.ListBuilder)
+ sb := lb.ValueBuilder().(*array.StructBuilder)
+ idb := sb.FieldBuilder(0).(*array.StringBuilder)
+ tokb := sb.FieldBuilder(1).(*array.StringBuilder)
+ amtb := sb.FieldBuilder(2).(*array.StringBuilder)
+
+ lb.Append(true)
+ sb.Append(true)
+ idb.Append("op-1")
+ tokb.Append("USDC")
+ amtb.Append("10")
+ sb.Append(true)
+ idb.Append("op-2")
+ tokb.Append("ETH")
+ amtb.Append("1.5")
+ lb.Append(true) // empty list
+ lb.Append(true)
+ sb.Append(true)
+ idb.Append("op-3")
+ tokb.AppendNull()
+ amtb.Append("42")
+
+ rec := b.NewRecordBatch()
+ defer rec.Release()
+
+ var buf bytes.Buffer
+ props :=
parquet.NewWriterProperties(parquet.WithDictionaryDefault(true),
parquet.WithStats(true))
+ arrowProps :=
pqarrow.NewArrowWriterProperties(pqarrow.WithStoreSchema())
+
+ pw, err := pqarrow.NewFileWriter(schema, &buf, props, arrowProps)
+ require.NoError(t, err)
+ require.NoError(t, pw.Write(rec))
+ require.NoError(t, pw.Close())
+
+ // Verify round-trip data is correct.
+ pf, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()))
+ require.NoError(t, err)
+ defer pf.Close()
+
+ fr, err := pqarrow.NewFileReader(pf, pqarrow.ArrowReadProperties{}, mem)
+ require.NoError(t, err)
+
+ tbl, err := fr.ReadTable(context.Background())
+ require.NoError(t, err)
+ defer tbl.Release()
+
+ require.EqualValues(t, 3, tbl.NumRows())
+
+ // Verify the stored ARROW:schema uses "element" as the list element
field name
+ // (consistent with the Parquet column path "ops.list.element.*"), not
"item"
+ // (the default Arrow field name from arrow.ListOf()).
+ arrowSchemaEncoded :=
pf.MetaData().KeyValueMetadata().FindValue("ARROW:schema")
+ require.NotNil(t, arrowSchemaEncoded, "ARROW:schema metadata key must
be present")
+ decoded, err := base64.StdEncoding.DecodeString(*arrowSchemaEncoded)
+ require.NoError(t, err)
+ // DeserializeSchema wraps bytes in an IPC stream; use ipc.NewReader to
decode.
+ ipcRdr, err := ipc.NewReader(bytes.NewReader(decoded),
ipc.WithAllocator(mem))
+ require.NoError(t, err)
+ defer ipcRdr.Release()
+ storedSchema := ipcRdr.Schema()
+
+ opsField, ok := storedSchema.FieldsByName("ops")
+ require.True(t, ok)
+ opsListType, ok := opsField[0].Type.(*arrow.ListType)
+ require.True(t, ok)
+ // Must be "element" (matching Parquet column path) not "item" (Arrow
default).
+ assert.Equal(t, "element", opsListType.ElemField().Name,
+ "ARROW:schema element name must match the Parquet column path
segment")
+}
+
func TestParquetArrowIO(t *testing.T) {
suite.Run(t, new(ParquetIOTestSuite))
}
diff --git a/parquet/pqarrow/file_writer.go b/parquet/pqarrow/file_writer.go
index dc2c90be..ff490b34 100644
--- a/parquet/pqarrow/file_writer.go
+++ b/parquet/pqarrow/file_writer.go
@@ -33,6 +33,74 @@ import (
"github.com/apache/arrow-go/v18/parquet/metadata"
)
+// normalizeFieldForParquet recursively normalizes an Arrow field so that its
+// type matches the Parquet column structure that fieldToNode would produce.
+// Specifically, list element field names are set to "element" because
+// ListOfWithName (used by fieldToNode) always names the Parquet element group
+// "element", regardless of the original Arrow element field name.
+func normalizeFieldForParquet(f arrow.Field) (arrow.Field, error) {
+ switch dt := f.Type.(type) {
+ case *arrow.ListType:
+ elem, err := normalizeFieldForParquet(dt.ElemField())
+ if err != nil {
+ return arrow.Field{}, err
+ }
+ elem.Name = "element"
+ return arrow.Field{Name: f.Name, Type: arrow.ListOfField(elem),
Nullable: f.Nullable, Metadata: f.Metadata}, nil
+ case *arrow.FixedSizeListType:
+ elem, err := normalizeFieldForParquet(dt.ElemField())
+ if err != nil {
+ return arrow.Field{}, err
+ }
+ elem.Name = "element"
+ return arrow.Field{Name: f.Name, Type:
arrow.FixedSizeListOfField(dt.Len(), elem), Nullable: f.Nullable, Metadata:
f.Metadata}, nil
+ case *arrow.StructType:
+ fields := make([]arrow.Field, dt.NumFields())
+ for i := 0; i < dt.NumFields(); i++ {
+ field, err := normalizeFieldForParquet(dt.Field(i))
+ if err != nil {
+ return arrow.Field{}, err
+ }
+ fields[i] = field
+ }
+ return arrow.Field{Name: f.Name, Type:
arrow.StructOf(fields...), Nullable: f.Nullable, Metadata: f.Metadata}, nil
+ case *arrow.MapType:
+ key, err := normalizeFieldForParquet(dt.KeyField())
+ if err != nil {
+ return arrow.Field{}, err
+ }
+ item, err := normalizeFieldForParquet(dt.ItemField())
+ if err != nil {
+ return arrow.Field{}, err
+ }
+ return arrow.Field{Name: f.Name, Type: arrow.MapOfFields(key,
item), Nullable: f.Nullable, Metadata: f.Metadata}, nil
+ case *arrow.RunEndEncodedType:
+ return arrow.Field{}, fmt.Errorf("RunEndEncoded types are not
supported for writing to Parquet files: field %s", f.Name)
+ case *arrow.ListViewType:
+ return arrow.Field{}, fmt.Errorf("ListView types are not
supported for writing to Parquet files: field %s", f.Name)
+ case *arrow.LargeListViewType:
+ return arrow.Field{}, fmt.Errorf("LargeListView types are not
supported for writing to Parquet files: field %s", f.Name)
+ }
+ return f, nil
+}
+
+// normalizeSchemaForParquet returns a copy of the Arrow schema with list
element
+// field names updated to "element" to match the Parquet column paths produced
by
+// fieldToNode. This is used when storing the ARROW:schema metadata to ensure
+// consistency between the stored schema and the actual Parquet column
structure.
+func normalizeSchemaForParquet(sc *arrow.Schema) (*arrow.Schema, error) {
+ fields := make([]arrow.Field, sc.NumFields())
+ for i, f := range sc.Fields() {
+ field, err := normalizeFieldForParquet(f)
+ if err != nil {
+ return nil, err
+ }
+ fields[i] = field
+ }
+ meta := sc.Metadata()
+ return arrow.NewSchema(fields, &meta), nil
+}
+
// WriteTable is a convenience function to create and write a full array.Table
to a parquet file. The schema
// and columns will be determined by the schema of the table, writing the file
out to the provided writer.
// The chunksize will be utilized in order to determine the size of the row
groups.
@@ -80,7 +148,17 @@ func NewFileWriter(arrschema *arrow.Schema, w io.Writer,
props *parquet.WriterPr
}
if arrprops.storeSchema {
- serializedSchema := flight.SerializeSchema(arrschema,
props.Allocator())
+ // Normalize the Arrow schema so that list element field names
match the
+ // Parquet column group names. fieldToNode always uses
"element" as the
+ // Parquet group name for list element fields (via
ListOfWithName), but
+ // arrow.ListOf() uses "item" as the Arrow element field name.
This
+ // inconsistency causes readers (e.g. Snowflake) that map
ARROW:schema field
+ // names to Parquet column paths to fail to locate the correct
columns.
+ schemaToStore, err := normalizeSchemaForParquet(arrschema)
+ if err != nil {
+ return nil, err
+ }
+ serializedSchema := flight.SerializeSchema(schemaToStore,
props.Allocator())
meta.Append("ARROW:schema",
base64.StdEncoding.EncodeToString(serializedSchema))
}
diff --git a/parquet/pqarrow/schema.go b/parquet/pqarrow/schema.go
index fa2246f3..cfa0574b 100644
--- a/parquet/pqarrow/schema.go
+++ b/parquet/pqarrow/schema.go
@@ -299,7 +299,7 @@ func fieldToNode(name string, field arrow.Field, props
*parquet.WriterProperties
}
case arrow.STRUCT:
return structToNode(field, props, arrprops)
- case arrow.FIXED_SIZE_LIST, arrow.LIST:
+ case arrow.FIXED_SIZE_LIST, arrow.LIST, arrow.LARGE_LIST:
elemField := field.Type.(arrow.ListLikeType).ElemField()
child, err := fieldToNode(name, elemField, props, arrprops)
@@ -722,7 +722,7 @@ func listToSchemaField(n *schema.GroupNode, currentLevels
file.LevelInfo, ctx *s
// If the name is array or ends in _tuple, this should be a
list of struct
// even for single child elements.
listGroup := listNode.(*schema.GroupNode)
- if listGroup.NumFields() == 1 && !(listGroup.Name() == "array"
|| listGroup.Name() == (n.Name()+"_tuple")) {
+ if listGroup.NumFields() == 1 && (listGroup.Name() != "array"
&& listGroup.Name() != n.Name()+"_tuple") {
// list of primitive type
if err := nodeToSchemaField(listGroup.Field(0),
currentLevels, ctx, out, &out.Children[0]); err != nil {
return err
diff --git a/parquet/schema/node.go b/parquet/schema/node.go
index 930c9a88..f95c6164 100644
--- a/parquet/schema/node.go
+++ b/parquet/schema/node.go
@@ -172,7 +172,7 @@ func NewPrimitiveNodeLogical(name string, repetition
parquet.Repetition, logical
n.convertedType, n.decimalMetaData =
n.logicalType.ToConvertedType()
}
- if !(n.logicalType != nil && !n.logicalType.IsNested() &&
n.logicalType.IsCompatible(n.convertedType, n.decimalMetaData)) {
+ if n.logicalType == nil || n.logicalType.IsNested() ||
!n.logicalType.IsCompatible(n.convertedType, n.decimalMetaData) {
return nil, fmt.Errorf("invalid logical type %s", n.logicalType)
}