This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git


The following commit(s) were added to refs/heads/main by this push:
     new a97bd0bf fix(parquet/pqarrow): normalize the element name in the 
stored ARROW:schema (#746)
a97bd0bf is described below

commit a97bd0bf6fb1312121aa6969bd2efbbe9a54fffa
Author: Matt Topol <[email protected]>
AuthorDate: Fri Apr 17 13:30:37 2026 -0400

    fix(parquet/pqarrow): normalize the element name in the stored ARROW:schema 
(#746)
    
    ### Rationale for this change
    closes #744
    
    ### What changes are included in this PR?
    Normalizes the element name of list fields in the stored ARROW:schema of
    a parquet file to avoid mismatches.
    
    ### Are these changes tested?
    Yes, a new unit test is added.
    
    ### Are there any user-facing changes?
    Yes, should be a bug fix to ensure consistency.
---
 parquet/pqarrow/encode_arrow_test.go | 104 ++++++++++++++++++++++++++++++++++-
 parquet/pqarrow/file_writer.go       |  80 ++++++++++++++++++++++++++-
 parquet/pqarrow/schema.go            |   4 +-
 parquet/schema/node.go               |   2 +-
 4 files changed, 184 insertions(+), 6 deletions(-)

diff --git a/parquet/pqarrow/encode_arrow_test.go 
b/parquet/pqarrow/encode_arrow_test.go
index b60a960c..10ff5074 100644
--- a/parquet/pqarrow/encode_arrow_test.go
+++ b/parquet/pqarrow/encode_arrow_test.go
@@ -19,6 +19,7 @@ package pqarrow_test
 import (
        "bytes"
        "context"
+       "encoding/base64"
        "encoding/binary"
        "fmt"
        "math"
@@ -1532,9 +1533,9 @@ func makeListArray(values arrow.Array, size, nullcount 
int) arrow.Array {
        nullBitmap := make([]byte, int(bitutil.BytesForBits(int64(size))))
 
        curOffset := 0
-       for i := 0; i < size; i++ {
+       for i := range size {
                offsetsArr[i] = int32(curOffset)
-               if !(((i % 2) == 0) && ((i / 2) < nullcount)) {
+               if i%2 != 0 || i/2 >= nullcount {
                        // non-null list (list with index 1 is always empty)
                        bitutil.SetBit(nullBitmap, i)
                        if i != 1 {
@@ -2108,6 +2109,105 @@ func (ps *ParquetIOTestSuite) 
TestStructWithListOfNestedStructs() {
        ps.roundTripTable(mem, expected, false)
 }
 
+// TestListOfStructWithEmptyListStoreSchema tests that ARROW:schema metadata 
stored
+// in a Parquet file uses "element" (not "item") as the list element field 
name, to
+// match the actual Parquet column paths. This is required for compatibility 
with
+// readers like Snowflake that resolve columns by matching ARROW:schema field 
names
+// to Parquet column path segments. See 
https://github.com/apache/arrow-go/issues/744.
+func TestListOfStructWithEmptyListStoreSchema(t *testing.T) {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(t, 0)
+
+       opsStruct := arrow.StructOf(
+               arrow.Field{Name: "id", Type: arrow.BinaryTypes.String, 
Nullable: false},
+               arrow.Field{Name: "token", Type: arrow.BinaryTypes.String, 
Nullable: true},
+               arrow.Field{Name: "amount", Type: arrow.BinaryTypes.String, 
Nullable: true},
+       )
+       // arrow.ListOf uses "item" as the element field name, which would 
mismatch
+       // the Parquet column path that uses "element". The fix ensures the 
stored
+       // ARROW:schema uses "element" to stay consistent with the Parquet 
columns.
+       schema := arrow.NewSchema([]arrow.Field{
+               {Name: "block_num", Type: arrow.PrimitiveTypes.Uint64, 
Nullable: false},
+               {Name: "tx_id", Type: arrow.BinaryTypes.String, Nullable: 
false},
+               {Name: "ops", Type: arrow.ListOf(opsStruct), Nullable: true},
+       }, nil)
+
+       b := array.NewRecordBuilder(mem, schema)
+       defer b.Release()
+
+       b.Field(0).(*array.Uint64Builder).AppendValues([]uint64{100, 101, 102}, 
nil)
+       b.Field(1).(*array.StringBuilder).AppendValues([]string{"tx-a", "tx-b", 
"tx-c"}, nil)
+
+       lb := b.Field(2).(*array.ListBuilder)
+       sb := lb.ValueBuilder().(*array.StructBuilder)
+       idb := sb.FieldBuilder(0).(*array.StringBuilder)
+       tokb := sb.FieldBuilder(1).(*array.StringBuilder)
+       amtb := sb.FieldBuilder(2).(*array.StringBuilder)
+
+       lb.Append(true)
+       sb.Append(true)
+       idb.Append("op-1")
+       tokb.Append("USDC")
+       amtb.Append("10")
+       sb.Append(true)
+       idb.Append("op-2")
+       tokb.Append("ETH")
+       amtb.Append("1.5")
+       lb.Append(true) // empty list
+       lb.Append(true)
+       sb.Append(true)
+       idb.Append("op-3")
+       tokb.AppendNull()
+       amtb.Append("42")
+
+       rec := b.NewRecordBatch()
+       defer rec.Release()
+
+       var buf bytes.Buffer
+       props := 
parquet.NewWriterProperties(parquet.WithDictionaryDefault(true), 
parquet.WithStats(true))
+       arrowProps := 
pqarrow.NewArrowWriterProperties(pqarrow.WithStoreSchema())
+
+       pw, err := pqarrow.NewFileWriter(schema, &buf, props, arrowProps)
+       require.NoError(t, err)
+       require.NoError(t, pw.Write(rec))
+       require.NoError(t, pw.Close())
+
+       // Verify round-trip data is correct.
+       pf, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()))
+       require.NoError(t, err)
+       defer pf.Close()
+
+       fr, err := pqarrow.NewFileReader(pf, pqarrow.ArrowReadProperties{}, mem)
+       require.NoError(t, err)
+
+       tbl, err := fr.ReadTable(context.Background())
+       require.NoError(t, err)
+       defer tbl.Release()
+
+       require.EqualValues(t, 3, tbl.NumRows())
+
+       // Verify the stored ARROW:schema uses "element" as the list element 
field name
+       // (consistent with the Parquet column path "ops.list.element.*"), not 
"item"
+       // (the default Arrow field name from arrow.ListOf()).
+       arrowSchemaEncoded := 
pf.MetaData().KeyValueMetadata().FindValue("ARROW:schema")
+       require.NotNil(t, arrowSchemaEncoded, "ARROW:schema metadata key must 
be present")
+       decoded, err := base64.StdEncoding.DecodeString(*arrowSchemaEncoded)
+       require.NoError(t, err)
+       // DeserializeSchema wraps bytes in an IPC stream; use ipc.NewReader to 
decode.
+       ipcRdr, err := ipc.NewReader(bytes.NewReader(decoded), 
ipc.WithAllocator(mem))
+       require.NoError(t, err)
+       defer ipcRdr.Release()
+       storedSchema := ipcRdr.Schema()
+
+       opsField, ok := storedSchema.FieldsByName("ops")
+       require.True(t, ok)
+       opsListType, ok := opsField[0].Type.(*arrow.ListType)
+       require.True(t, ok)
+       // Must be "element" (matching Parquet column path) not "item" (Arrow 
default).
+       assert.Equal(t, "element", opsListType.ElemField().Name,
+               "ARROW:schema element name must match the Parquet column path 
segment")
+}
+
 func TestParquetArrowIO(t *testing.T) {
        suite.Run(t, new(ParquetIOTestSuite))
 }
diff --git a/parquet/pqarrow/file_writer.go b/parquet/pqarrow/file_writer.go
index dc2c90be..ff490b34 100644
--- a/parquet/pqarrow/file_writer.go
+++ b/parquet/pqarrow/file_writer.go
@@ -33,6 +33,74 @@ import (
        "github.com/apache/arrow-go/v18/parquet/metadata"
 )
 
+// normalizeFieldForParquet recursively normalizes an Arrow field so that its
+// type matches the Parquet column structure that fieldToNode would produce.
+// Specifically, list element field names are set to "element" because
+// ListOfWithName (used by fieldToNode) always names the Parquet element group
+// "element", regardless of the original Arrow element field name.
+func normalizeFieldForParquet(f arrow.Field) (arrow.Field, error) {
+       switch dt := f.Type.(type) {
+       case *arrow.ListType:
+               elem, err := normalizeFieldForParquet(dt.ElemField())
+               if err != nil {
+                       return arrow.Field{}, err
+               }
+               elem.Name = "element"
+               return arrow.Field{Name: f.Name, Type: arrow.ListOfField(elem), 
Nullable: f.Nullable, Metadata: f.Metadata}, nil
+       case *arrow.FixedSizeListType:
+               elem, err := normalizeFieldForParquet(dt.ElemField())
+               if err != nil {
+                       return arrow.Field{}, err
+               }
+               elem.Name = "element"
+               return arrow.Field{Name: f.Name, Type: 
arrow.FixedSizeListOfField(dt.Len(), elem), Nullable: f.Nullable, Metadata: 
f.Metadata}, nil
+       case *arrow.StructType:
+               fields := make([]arrow.Field, dt.NumFields())
+               for i := 0; i < dt.NumFields(); i++ {
+                       field, err := normalizeFieldForParquet(dt.Field(i))
+                       if err != nil {
+                               return arrow.Field{}, err
+                       }
+                       fields[i] = field
+               }
+               return arrow.Field{Name: f.Name, Type: 
arrow.StructOf(fields...), Nullable: f.Nullable, Metadata: f.Metadata}, nil
+       case *arrow.MapType:
+               key, err := normalizeFieldForParquet(dt.KeyField())
+               if err != nil {
+                       return arrow.Field{}, err
+               }
+               item, err := normalizeFieldForParquet(dt.ItemField())
+               if err != nil {
+                       return arrow.Field{}, err
+               }
+               return arrow.Field{Name: f.Name, Type: arrow.MapOfFields(key, 
item), Nullable: f.Nullable, Metadata: f.Metadata}, nil
+       case *arrow.RunEndEncodedType:
+               return arrow.Field{}, fmt.Errorf("RunEndEncoded types are not 
supported for writing to Parquet files: field %s", f.Name)
+       case *arrow.ListViewType:
+               return arrow.Field{}, fmt.Errorf("ListView types are not 
supported for writing to Parquet files: field %s", f.Name)
+       case *arrow.LargeListViewType:
+               return arrow.Field{}, fmt.Errorf("LargeListView types are not 
supported for writing to Parquet files: field %s", f.Name)
+       }
+       return f, nil
+}
+
+// normalizeSchemaForParquet returns a copy of the Arrow schema with list 
element
+// field names updated to "element" to match the Parquet column paths produced 
by
+// fieldToNode. This is used when storing the ARROW:schema metadata to ensure
+// consistency between the stored schema and the actual Parquet column 
structure.
+func normalizeSchemaForParquet(sc *arrow.Schema) (*arrow.Schema, error) {
+       fields := make([]arrow.Field, sc.NumFields())
+       for i, f := range sc.Fields() {
+               field, err := normalizeFieldForParquet(f)
+               if err != nil {
+                       return nil, err
+               }
+               fields[i] = field
+       }
+       meta := sc.Metadata()
+       return arrow.NewSchema(fields, &meta), nil
+}
+
 // WriteTable is a convenience function to create and write a full array.Table 
to a parquet file. The schema
 // and columns will be determined by the schema of the table, writing the file 
out to the provided writer.
 // The chunksize will be utilized in order to determine the size of the row 
groups.
@@ -80,7 +148,17 @@ func NewFileWriter(arrschema *arrow.Schema, w io.Writer, 
props *parquet.WriterPr
        }
 
        if arrprops.storeSchema {
-               serializedSchema := flight.SerializeSchema(arrschema, 
props.Allocator())
+               // Normalize the Arrow schema so that list element field names 
match the
+               // Parquet column group names. fieldToNode always uses 
"element" as the
+               // Parquet group name for list element fields (via 
ListOfWithName), but
+               // arrow.ListOf() uses "item" as the Arrow element field name. 
This
+               // inconsistency causes readers (e.g. Snowflake) that map 
ARROW:schema field
+               // names to Parquet column paths to fail to locate the correct 
columns.
+               schemaToStore, err := normalizeSchemaForParquet(arrschema)
+               if err != nil {
+                       return nil, err
+               }
+               serializedSchema := flight.SerializeSchema(schemaToStore, 
props.Allocator())
                meta.Append("ARROW:schema", 
base64.StdEncoding.EncodeToString(serializedSchema))
        }
 
diff --git a/parquet/pqarrow/schema.go b/parquet/pqarrow/schema.go
index fa2246f3..cfa0574b 100644
--- a/parquet/pqarrow/schema.go
+++ b/parquet/pqarrow/schema.go
@@ -299,7 +299,7 @@ func fieldToNode(name string, field arrow.Field, props 
*parquet.WriterProperties
                }
        case arrow.STRUCT:
                return structToNode(field, props, arrprops)
-       case arrow.FIXED_SIZE_LIST, arrow.LIST:
+       case arrow.FIXED_SIZE_LIST, arrow.LIST, arrow.LARGE_LIST:
                elemField := field.Type.(arrow.ListLikeType).ElemField()
 
                child, err := fieldToNode(name, elemField, props, arrprops)
@@ -722,7 +722,7 @@ func listToSchemaField(n *schema.GroupNode, currentLevels 
file.LevelInfo, ctx *s
                //   If the name is array or ends in _tuple, this should be a 
list of struct
                //   even for single child elements.
                listGroup := listNode.(*schema.GroupNode)
-               if listGroup.NumFields() == 1 && !(listGroup.Name() == "array" 
|| listGroup.Name() == (n.Name()+"_tuple")) {
+               if listGroup.NumFields() == 1 && (listGroup.Name() != "array" 
&& listGroup.Name() != n.Name()+"_tuple") {
                        // list of primitive type
                        if err := nodeToSchemaField(listGroup.Field(0), 
currentLevels, ctx, out, &out.Children[0]); err != nil {
                                return err
diff --git a/parquet/schema/node.go b/parquet/schema/node.go
index 930c9a88..f95c6164 100644
--- a/parquet/schema/node.go
+++ b/parquet/schema/node.go
@@ -172,7 +172,7 @@ func NewPrimitiveNodeLogical(name string, repetition 
parquet.Repetition, logical
                n.convertedType, n.decimalMetaData = 
n.logicalType.ToConvertedType()
        }
 
-       if !(n.logicalType != nil && !n.logicalType.IsNested() && 
n.logicalType.IsCompatible(n.convertedType, n.decimalMetaData)) {
+       if n.logicalType == nil || n.logicalType.IsNested() || 
!n.logicalType.IsCompatible(n.convertedType, n.decimalMetaData) {
                return nil, fmt.Errorf("invalid logical type %s", n.logicalType)
        }
 

Reply via email to