This is an automated email from the ASF dual-hosted git repository. zeroshade pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push: new 2cf2b29 feat(parquet/pqarrow): Correctly handle Variant types in schema (#433) 2cf2b29 is described below commit 2cf2b297672f37895c83a42bfaa1ac08352efc16 Author: Matt Topol <zotthewiz...@gmail.com> AuthorDate: Tue Jul 8 13:36:15 2025 -0400 feat(parquet/pqarrow): Correctly handle Variant types in schema (#433) ### Rationale for this change Updating the `pqarrow` package to handle the variant extension type when converting between arrow and parquet schemas. ### What changes are included in this PR? Replacing the TODOs with implementations to handle shredded variant structures in schema conversion. ### Are these changes tested? A unit test is added for shredded variant handling. ### Are there any user-facing changes? Only that this is now supported instead of erroring. --- arrow/extensions/variant.go | 8 ++++ parquet/pqarrow/schema.go | 22 ++++++--- parquet/pqarrow/schema_test.go | 104 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 127 insertions(+), 7 deletions(-) diff --git a/arrow/extensions/variant.go b/arrow/extensions/variant.go index 7098b77..fbef4a6 100644 --- a/arrow/extensions/variant.go +++ b/arrow/extensions/variant.go @@ -168,6 +168,14 @@ func (v *VariantType) Value() arrow.Field { return v.StorageType().(*arrow.StructType).Field(v.valueFieldIdx) } +func (v *VariantType) TypedValue() arrow.Field { + if v.typedValueFieldIdx == -1 { + return arrow.Field{} + } + + return v.StorageType().(*arrow.StructType).Field(v.typedValueFieldIdx) +} + func (*VariantType) ExtensionName() string { return "parquet.variant" } func (v *VariantType) String() string { diff --git a/parquet/pqarrow/schema.go b/parquet/pqarrow/schema.go index 34e4cc6..17603b9 100644 --- a/parquet/pqarrow/schema.go +++ b/parquet/pqarrow/schema.go @@ -253,11 +253,19 @@ func variantToNode(t *extensions.VariantType, field arrow.Field, props *parquet. return nil, err } - //TODO: implement shredding + fields := schema.FieldList{metadataNode, valueNode} + + typedField := t.TypedValue() + if typedField.Type != nil { + typedNode, err := fieldToNode("typed_value", typedField, props, arrProps) + if err != nil { + return nil, err + } + fields = append(fields, typedNode) + } return schema.NewGroupNodeLogical(field.Name, repFromNullable(field.Nullable), - schema.FieldList{metadataNode, valueNode}, schema.VariantLogicalType{}, - fieldIDFromMeta(field.Metadata)) + fields, schema.VariantLogicalType{}, fieldIDFromMeta(field.Metadata)) } func structToNode(field arrow.Field, props *parquet.WriterProperties, arrprops ArrowWriterProperties) (schema.Node, error) { @@ -857,10 +865,10 @@ func mapToSchemaField(n *schema.GroupNode, currentLevels file.LevelInfo, ctx *sc } func variantToSchemaField(n *schema.GroupNode, currentLevels file.LevelInfo, ctx *schemaTree, _, out *SchemaField) error { - // this is for unshredded variants. shredded variants may have more fields - // TODO: implement support for shredded variants - if n.NumFields() != 2 { - return errors.New("VARIANT group must have exactly 2 children") + switch n.NumFields() { + case 2, 3: + default: + return errors.New("VARIANT group must have exactly 2 or 3 children") } var err error diff --git a/parquet/pqarrow/schema_test.go b/parquet/pqarrow/schema_test.go index 58475dc..6f3da88 100644 --- a/parquet/pqarrow/schema_test.go +++ b/parquet/pqarrow/schema_test.go @@ -534,3 +534,107 @@ func TestConvertSchemaParquetVariant(t *testing.T) { require.NoError(t, err) assert.True(t, pqschema.Equals(sc), pqschema.String(), sc.String()) } + +func TestShreddedVariantSchema(t *testing.T) { + metaNoFieldID := arrow.NewMetadata([]string{"PARQUET:field_id"}, []string{"-1"}) + + s := arrow.StructOf( + arrow.Field{Name: "metadata", Type: arrow.BinaryTypes.Binary, Metadata: metaNoFieldID}, + arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true, Metadata: metaNoFieldID}, + arrow.Field{Name: "typed_value", Type: arrow.StructOf( + arrow.Field{Name: "tsmicro", Type: arrow.StructOf( + arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true, Metadata: metaNoFieldID}, + arrow.Field{Name: "typed_value", Type: arrow.FixedWidthTypes.Timestamp_us, Nullable: true, Metadata: metaNoFieldID}, + ), Metadata: metaNoFieldID}, + arrow.Field{Name: "strval", Type: arrow.StructOf( + arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true, Metadata: metaNoFieldID}, + arrow.Field{Name: "typed_value", Type: arrow.BinaryTypes.String, Nullable: true, Metadata: metaNoFieldID}, + ), Metadata: metaNoFieldID}, + arrow.Field{Name: "bool", Type: arrow.StructOf( + arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true, Metadata: metaNoFieldID}, + arrow.Field{Name: "typed_value", Type: arrow.FixedWidthTypes.Boolean, Nullable: true, Metadata: metaNoFieldID}, + ), Metadata: metaNoFieldID}, + arrow.Field{Name: "uuid", Type: arrow.StructOf( + arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary, Nullable: true, Metadata: metaNoFieldID}, + arrow.Field{Name: "typed_value", Type: extensions.NewUUIDType(), Nullable: true, Metadata: metaNoFieldID}, + ), Metadata: metaNoFieldID}, + ), Nullable: true, Metadata: metaNoFieldID}) + + vt, err := extensions.NewVariantType(s) + require.NoError(t, err) + + arrSchema := arrow.NewSchema([]arrow.Field{ + {Name: "variant_col", Type: vt, Nullable: true, Metadata: metaNoFieldID}, + {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false, Metadata: metaNoFieldID}, + }, nil) + + sc, err := pqarrow.ToParquet(arrSchema, nil, pqarrow.DefaultWriterProps()) + require.NoError(t, err) + + // the equivalent shredded variant parquet schema looks like this: + // repeated group field_id=-1 schema { + // optional group field_id=-1 variant_col (Variant) { + // required byte_array field_id=-1 metadata; + // optional byte_array field_id=-1 value; + // optional group field_id=-1 typed_value { + // required group field_id=-1 tsmicro { + // optional byte_array field_id=-1 value; + // optional int64 field_id=-1 typed_value (Timestamp(isAdjustedToUTC=true, timeUnit=microseconds, is_from_converted_type=false, force_set_converted_type=true)); + // } + // required group field_id=-1 strval { + // optional byte_array field_id=-1 value; + // optional byte_array field_id=-1 typed_value (String); + // } + // required group field_id=-1 bool { + // optional byte_array field_id=-1 value; + // optional boolean field_id=-1 typed_value; + // } + // required group field_id=-1 uuid { + // optional byte_array field_id=-1 value; + // optional fixed_len_byte_array field_id=-1 typed_value (UUID); + // } + // } + // } + // required int64 field_id=-1 id (Int(bitWidth=64, isSigned=true)); + // } + + expected := schema.NewSchema(schema.MustGroup(schema.NewGroupNode("schema", + parquet.Repetitions.Repeated, schema.FieldList{ + schema.Must(schema.NewGroupNodeLogical("variant_col", parquet.Repetitions.Optional, schema.FieldList{ + schema.MustPrimitive(schema.NewPrimitiveNode("metadata", parquet.Repetitions.Required, parquet.Types.ByteArray, -1, -1)), + schema.MustPrimitive(schema.NewPrimitiveNode("value", parquet.Repetitions.Optional, parquet.Types.ByteArray, -1, -1)), + schema.MustGroup(schema.NewGroupNode("typed_value", parquet.Repetitions.Optional, schema.FieldList{ + schema.MustGroup(schema.NewGroupNode("tsmicro", parquet.Repetitions.Required, schema.FieldList{ + schema.MustPrimitive(schema.NewPrimitiveNode("value", parquet.Repetitions.Optional, parquet.Types.ByteArray, -1, -1)), + schema.MustPrimitive(schema.NewPrimitiveNodeLogical("typed_value", parquet.Repetitions.Optional, schema.NewTimestampLogicalTypeWithOpts( + schema.WithTSTimeUnitType(schema.TimeUnitMicros), schema.WithTSIsAdjustedToUTC(), schema.WithTSForceConverted(), + ), parquet.Types.Int64, -1, -1)), + }, -1)), + schema.MustGroup(schema.NewGroupNode("strval", parquet.Repetitions.Required, schema.FieldList{ + schema.MustPrimitive(schema.NewPrimitiveNode("value", parquet.Repetitions.Optional, parquet.Types.ByteArray, -1, -1)), + schema.MustPrimitive(schema.NewPrimitiveNodeLogical("typed_value", parquet.Repetitions.Optional, + schema.StringLogicalType{}, parquet.Types.ByteArray, -1, -1)), + }, -1)), + schema.MustGroup(schema.NewGroupNode("bool", parquet.Repetitions.Required, schema.FieldList{ + schema.MustPrimitive(schema.NewPrimitiveNode("value", parquet.Repetitions.Optional, parquet.Types.ByteArray, -1, -1)), + schema.MustPrimitive(schema.NewPrimitiveNode("typed_value", parquet.Repetitions.Optional, + parquet.Types.Boolean, -1, -1)), + }, -1)), + schema.MustGroup(schema.NewGroupNode("uuid", parquet.Repetitions.Required, schema.FieldList{ + schema.MustPrimitive(schema.NewPrimitiveNode("value", parquet.Repetitions.Optional, parquet.Types.ByteArray, -1, -1)), + schema.MustPrimitive(schema.NewPrimitiveNodeLogical("typed_value", parquet.Repetitions.Optional, + schema.UUIDLogicalType{}, parquet.Types.FixedLenByteArray, 16, -1)), + }, -1)), + }, -1)), + }, schema.VariantLogicalType{}, -1)), + schema.MustPrimitive(schema.NewPrimitiveNodeLogical("id", parquet.Repetitions.Required, + schema.NewIntLogicalType(64, true), parquet.Types.Int64, -1, -1)), + }, -1))) + + assert.True(t, sc.Equals(expected), "expected: %s\ngot: %s", expected, sc) + + arrsc, err := pqarrow.FromParquet(sc, nil, metadata.KeyValueMetadata{}) + require.NoError(t, err) + + assert.True(t, arrSchema.Equal(arrsc), "expected: %s\ngot: %s", arrSchema, arrsc) +}