This is an automated email from the ASF dual-hosted git repository. zeroshade pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push: new c43f0ed fix(table/scanner): Fix nested field scan (#311) c43f0ed is described below commit c43f0ed44b87845a9b47524790087146c355798b Author: Matt Topol <zotthewiz...@gmail.com> AuthorDate: Sat Mar 1 12:22:28 2025 -0500 fix(table/scanner): Fix nested field scan (#311) Fixes #309 There was a combination of factors that caused the initial problem: 1. The arrow-go/v18/parquet/pqarrow library wasn't properly propagating `PARQUET:field_id` metadata for children of List or Map typed fields 2. We only iterated the *fields* and skipped list/maptypes when selecting column indexes, this caused us to miss the children. Instead we need to iterate all of the *field IDs*, this change updates that. 3. When pruning parquet fields we were not propagating the correct ColIndex for map typed columns, we want the leaves so we need the ColIndex of the children 4. creating the output arrays during `ToRequestedSchema` led to a memory leak for list/map columns that needed to be fixed. A unit test has been added to ensure we are properly able to read the `test_all_types` table and get the rows without error. --- go.mod | 6 +++--- go.sum | 8 ++++---- io/s3.go | 1 + schema.go | 4 ++++ table/arrow_scanner.go | 18 ++++++++++------- table/arrow_utils.go | 34 +++++++++++++++++++------------- table/arrow_utils_test.go | 43 ++++++++++++++++++++++++++++++++++++++--- table/internal/parquet_files.go | 11 +++++++---- table/scanner_test.go | 17 ++++++++++++++++ 9 files changed, 108 insertions(+), 34 deletions(-) diff --git a/go.mod b/go.mod index 4ae6456..0d0f6e4 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ go 1.23.0 toolchain go1.23.6 require ( - github.com/apache/arrow-go/v18 v18.1.1-0.20250218215100-460f5004b92d + github.com/apache/arrow-go/v18 v18.1.1-0.20250226170053-efecae3596e6 github.com/aws/aws-sdk-go-v2 v1.36.2 github.com/aws/aws-sdk-go-v2/config v1.29.7 github.com/aws/aws-sdk-go-v2/credentials v1.17.60 @@ -45,7 +45,6 @@ require ( github.com/uptrace/bun/driver/sqliteshim v1.2.10 github.com/uptrace/bun/extra/bundebug v1.2.10 gocloud.dev v0.40.0 - golang.org/x/exp v0.0.0-20250215185904-eff6e970281f golang.org/x/sync v0.11.0 google.golang.org/api v0.222.0 gopkg.in/yaml.v3 v3.0.1 @@ -103,7 +102,7 @@ require ( github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/asmfmt v1.3.2 // indirect - github.com/klauspost/compress v1.17.11 // indirect + github.com/klauspost/compress v1.18.0 // indirect github.com/klauspost/cpuid/v2 v2.2.9 // indirect github.com/lithammer/fuzzysearch v1.1.8 // indirect github.com/mattn/go-colorable v0.1.14 // indirect @@ -136,6 +135,7 @@ require ( go.opentelemetry.io/otel/metric v1.34.0 // indirect go.opentelemetry.io/otel/trace v1.34.0 // indirect golang.org/x/crypto v0.33.0 // indirect + golang.org/x/exp v0.0.0-20250215185904-eff6e970281f // indirect golang.org/x/mod v0.23.0 // indirect golang.org/x/net v0.35.0 // indirect golang.org/x/oauth2 v0.26.0 // indirect diff --git a/go.sum b/go.sum index 8ec5666..b5df389 100644 --- a/go.sum +++ b/go.sum @@ -37,8 +37,8 @@ github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7X github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA= github.com/antlr4-go/antlr/v4 v4.13.1 h1:SqQKkuVZ+zWkMMNkjy5FZe5mr5WURWnlpmOuzYWrPrQ= github.com/antlr4-go/antlr/v4 v4.13.1/go.mod h1:GKmUxMtwp6ZgGwZSva4eWPC5mS6vUAmOABFgjdkM7Nw= -github.com/apache/arrow-go/v18 v18.1.1-0.20250218215100-460f5004b92d h1:OL//u0ke+2Dld3s5szg3KC7ZgwaATfdmF5LyxpRxtC8= -github.com/apache/arrow-go/v18 v18.1.1-0.20250218215100-460f5004b92d/go.mod h1:6vlLiEonoJLH6uX9X84ki6mZlOPjuVC4qoQkxsLzwd8= +github.com/apache/arrow-go/v18 v18.1.1-0.20250226170053-efecae3596e6 h1:RWgwATcu+8ZR65fmb/esaQkQqoCBhiRnB6WPMfAxJOs= +github.com/apache/arrow-go/v18 v18.1.1-0.20250226170053-efecae3596e6/go.mod h1:ev6qDk3+bf7eh/goQbjA4ZViRDhu+9pmad8YTLvupIg= github.com/apache/thrift v0.21.0 h1:tdPmh/ptjE1IJnhbhrcl2++TauVjy242rkV/UzJChnE= github.com/apache/thrift v0.21.0/go.mod h1:W1H8aR/QRtYNvrPeFXBtobyRkd0/YVhTc6i07XIAgDw= github.com/atomicgo/cursor v0.0.1/go.mod h1:cBON2QmmrysudxNBFthvMtN32r3jxVRIvzkUiF/RuIk= @@ -188,8 +188,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4= github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= -github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= -github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.0.10/go.mod h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c= github.com/klauspost/cpuid/v2 v2.0.12/go.mod h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c= diff --git a/io/s3.go b/io/s3.go index 7a0791e..98ad4c6 100644 --- a/io/s3.go +++ b/io/s3.go @@ -138,6 +138,7 @@ func createS3Bucket(ctx context.Context, parsed *url.URL, props map[string]strin o.BaseEndpoint = aws.String(endpoint) } o.UsePathStyle = usePathStyle + o.DisableLogOutputChecksumValidationSkipped = true }) // Create a *blob.Bucket. diff --git a/schema.go b/schema.go index 04c29e7..50d347c 100644 --- a/schema.go +++ b/schema.go @@ -165,6 +165,10 @@ func (s *Schema) AsStruct() StructType { return StructType{FieldList: s.field func (s *Schema) NumFields() int { return len(s.fields) } func (s *Schema) Field(i int) NestedField { return s.fields[i] } func (s *Schema) Fields() []NestedField { return slices.Clone(s.fields) } +func (s *Schema) FieldIDs() []int { + idx, _ := s.lazyNameToID() + return slices.Collect(maps.Values(idx)) +} func (s *Schema) UnmarshalJSON(b []byte) error { type Alias Schema diff --git a/table/arrow_scanner.go b/table/arrow_scanner.go index f87514a..1a2cfcf 100644 --- a/table/arrow_scanner.go +++ b/table/arrow_scanner.go @@ -217,11 +217,12 @@ type arrowScan struct { func (as *arrowScan) projectedFieldIDs() (set[int], error) { idset := set[int]{} - for _, field := range as.projectedSchema.Fields() { - switch field.Type.(type) { + for _, id := range as.projectedSchema.FieldIDs() { + typ, _ := as.projectedSchema.FindTypeByID(id) + switch typ.(type) { case *iceberg.MapType, *iceberg.ListType: default: - idset[field.ID] = struct{}{} + idset[id] = struct{}{} } } @@ -435,7 +436,7 @@ func (as *arrowScan) recordsFromTask(ctx context.Context, task internal.Enumerat return } -func createIterator(ctx context.Context, numWorkers uint, records <-chan enumeratedRecord, deletesPerFile perFilePosDeletes, cancel context.CancelFunc, rowLimit int64) iter.Seq2[arrow.Record, error] { +func createIterator(ctx context.Context, numWorkers uint, records <-chan enumeratedRecord, deletesPerFile perFilePosDeletes, cancel context.CancelCauseFunc, rowLimit int64) iter.Seq2[arrow.Record, error] { isBeforeAny := func(batch enumeratedRecord) bool { return batch.Task.Index < 0 } @@ -484,11 +485,14 @@ func createIterator(ctx context.Context, numWorkers uint, records <-chan enumera } }() - defer cancel() + defer cancel(nil) for { select { case <-ctx.Done(): + if err := context.Cause(ctx); err != nil { + yield(nil, err) + } return case enum, ok := <-sequenced: if !ok { @@ -531,7 +535,7 @@ func createIterator(ctx context.Context, numWorkers uint, records <-chan enumera func (as *arrowScan) recordBatchesFromTasksAndDeletes(ctx context.Context, tasks []FileScanTask, deletesPerFile perFilePosDeletes) iter.Seq2[arrow.Record, error] { extSet := substrait.NewExtensionSet() - ctx, cancel := context.WithCancel(exprs.WithExtensionIDSet(ctx, extSet)) + ctx, cancel := context.WithCancelCause(exprs.WithExtensionIDSet(ctx, extSet)) taskChan := make(chan internal.Enumerated[FileScanTask], len(tasks)) // numWorkers := 1 @@ -554,7 +558,7 @@ func (as *arrowScan) recordBatchesFromTasksAndDeletes(ctx context.Context, tasks if err := as.recordsFromTask(ctx, task, records, deletesPerFile[task.Value.File.FilePath()]); err != nil { - cancel() + cancel(err) return } } diff --git a/table/arrow_utils.go b/table/arrow_utils.go index 2bfe881..490dac0 100644 --- a/table/arrow_utils.go +++ b/table/arrow_utils.go @@ -497,7 +497,10 @@ func (c convertToArrow) Field(field iceberg.NestedField, result arrow.Field) arr func (c convertToArrow) List(list iceberg.ListType, elemResult arrow.Field) arrow.Field { elemField := c.Field(list.ElementField(), elemResult) - return arrow.Field{Type: arrow.LargeListOfField(elemField)} + if c.useLargeTypes { + return arrow.Field{Type: arrow.LargeListOfField(elemField)} + } + return arrow.Field{Type: arrow.ListOfField(elemField)} } func (c convertToArrow) Map(m iceberg.MapType, keyResult, valResult arrow.Field) arrow.Field { @@ -589,9 +592,9 @@ func SchemaToArrowSchema(sc *iceberg.Schema, metadata map[string]string, include // TypeToArrowType converts a given iceberg type, into the equivalent Arrow data type. // For dealing with nested fields (List, Struct, Map) if includeFieldIDs is true, then // the child fields will contain a metadata key PARQUET:field_id set to the field id. -func TypeToArrowType(t iceberg.Type, includeFieldIDs bool) (arrow.DataType, error) { +func TypeToArrowType(t iceberg.Type, includeFieldIDs bool, useLargeTypes bool) (arrow.DataType, error) { top, err := iceberg.Visit(iceberg.NewSchema(0, iceberg.NestedField{Type: t}), - convertToArrow{includeFieldIDs: includeFieldIDs}) + convertToArrow{includeFieldIDs: includeFieldIDs, useLargeTypes: useLargeTypes}) if err != nil { return nil, err } @@ -677,7 +680,7 @@ func (a *arrowProjectionVisitor) castIfNeeded(field iceberg.NestedField, vals ar if !field.Type.Equals(typ) { promoted := retOrPanic(iceberg.PromoteType(fileField.Type, field.Type)) - targetType := retOrPanic(TypeToArrowType(promoted, a.includeFieldIDs)) + targetType := retOrPanic(TypeToArrowType(promoted, a.includeFieldIDs, a.useLargeTypes)) if !a.useLargeTypes { targetType = retOrPanic(ensureSmallArrowTypes(targetType)) } @@ -686,7 +689,7 @@ func (a *arrowProjectionVisitor) castIfNeeded(field iceberg.NestedField, vals ar compute.SafeCastOptions(targetType))) } - targetType := retOrPanic(TypeToArrowType(field.Type, a.includeFieldIDs)) + targetType := retOrPanic(TypeToArrowType(field.Type, a.includeFieldIDs, a.useLargeTypes)) if !arrow.TypeEqual(targetType, vals.DataType()) { switch field.Type.(type) { case iceberg.TimestampType: @@ -756,12 +759,16 @@ func (a *arrowProjectionVisitor) Struct(st iceberg.StructType, structArr arrow.A for i, field := range st.FieldList { arr := fieldResults[i] if arr != nil { + if _, ok := arr.DataType().(arrow.NestedType); ok { + defer arr.Release() + } + arr = a.castIfNeeded(field, arr) defer arr.Release() fieldArrs[i] = arr fields[i] = a.constructField(field, arr.DataType()) } else if !field.Required { - dt := retOrPanic(TypeToArrowType(field.Type, false)) + dt := retOrPanic(TypeToArrowType(field.Type, false, a.useLargeTypes)) arr = array.MakeArrayOfNull(compute.GetAllocator(a.ctx), dt, structArr.Len()) defer arr.Release() @@ -786,11 +793,11 @@ func (a *arrowProjectionVisitor) List(listType iceberg.ListType, listArr arrow.A return nil } - valueArr := a.castIfNeeded(listType.ElementField(), valArr) - defer valueArr.Release() + valArr = a.castIfNeeded(listType.ElementField(), valArr) + defer valArr.Release() var outType arrow.ListLikeType - elemField := a.constructField(listType.ElementField(), valueArr.DataType()) + elemField := a.constructField(listType.ElementField(), valArr.DataType()) switch arr.DataType().ID() { case arrow.LIST: outType = arrow.ListOfField(elemField) @@ -801,7 +808,7 @@ func (a *arrowProjectionVisitor) List(listType iceberg.ListType, listArr arrow.A } data := array.NewData(outType, arr.Len(), arr.Data().Buffers(), - []arrow.ArrayData{valueArr.Data()}, arr.NullN(), arr.Data().Offset()) + []arrow.ArrayData{valArr.Data()}, arr.NullN(), arr.Data().Offset()) defer data.Release() return array.MakeFromData(data) } @@ -855,7 +862,8 @@ func ToRequestedSchema(ctx context.Context, requested, fileSchema *iceberg.Schem if err != nil { return nil, err } - defer result.Release() - - return array.RecordFromStructArray(result.(*array.Struct), nil), nil + st.Release() + out := array.RecordFromStructArray(result.(*array.Struct), nil) + result.Release() + return out, nil } diff --git a/table/arrow_utils_test.go b/table/arrow_utils_test.go index 134d34d..058721d 100644 --- a/table/arrow_utils_test.go +++ b/table/arrow_utils_test.go @@ -18,6 +18,7 @@ package table_test import ( + "bufio" "context" "strings" "testing" @@ -114,7 +115,7 @@ func TestArrowToIceberg(t *testing.T) { ElementID: 1, Element: iceberg.PrimitiveTypes.Int32, ElementRequired: true, - }, false, ""}, + }, true, ""}, {arrow.LargeListOfField(arrow.Field{ Name: "element", Type: arrow.PrimitiveTypes.Int32, @@ -124,7 +125,7 @@ func TestArrowToIceberg(t *testing.T) { ElementID: 1, Element: iceberg.PrimitiveTypes.Int32, ElementRequired: true, - }, true, ""}, + }, false, ""}, {arrow.FixedSizeListOfField(1, arrow.Field{ Name: "element", Type: arrow.PrimitiveTypes.Int32, @@ -164,7 +165,7 @@ func TestArrowToIceberg(t *testing.T) { } if tt.reciprocal { - result, err := table.TypeToArrowType(tt.ice, true) + result, err := table.TypeToArrowType(tt.ice, true, false) require.NoError(t, err) assert.True(t, arrow.TypeEqual(tt.dt, result), tt.dt.String(), result.String()) } @@ -510,3 +511,39 @@ func TestToRequestedSchemaTimestamps(t *testing.T) { } } } + +func TestToRequestedSchema(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer mem.AssertSize(t, 0) + + schema := arrow.NewSchema([]arrow.Field{ + { + Name: "nested", Type: arrow.ListOfField(arrow.Field{ + Name: "element", Type: arrow.PrimitiveTypes.Int32, Nullable: false, + Metadata: arrow.NewMetadata([]string{table.ArrowParquetFieldIDKey}, []string{"2"}), + }), + Metadata: arrow.NewMetadata([]string{table.ArrowParquetFieldIDKey}, []string{"1"})}, + }, nil) + + bldr := array.NewRecordBuilder(mem, schema) + defer bldr.Release() + + const data = `{"nested": [1, 2, 3]} + {"nested": [4, 5, 6]}` + + s := bufio.NewScanner(strings.NewReader(data)) + require.True(t, s.Scan()) + require.NoError(t, bldr.UnmarshalJSON(s.Bytes())) + + rec := bldr.NewRecord() + defer rec.Release() + + icesc, err := table.ArrowSchemaToIceberg(schema, false, nil) + require.NoError(t, err) + + rec2, err := table.ToRequestedSchema(context.Background(), icesc, icesc, rec, true, true, false) + require.NoError(t, err) + defer rec2.Release() + + assert.True(t, array.RecordEqual(rec, rec2)) +} diff --git a/table/internal/parquet_files.go b/table/internal/parquet_files.go index 1c5d38e..c128f5b 100644 --- a/table/internal/parquet_files.go +++ b/table/internal/parquet_files.go @@ -149,7 +149,8 @@ func visitParquetManifestStruct[T any](field pqarrow.SchemaField, visitor manife results := make([]T, len(field.Children)) for i, f := range field.Children { - results[i] = visitManifestField(f, visitor) + res := visitManifestField(f, visitor) + results[i] = visitor.Field(f, res) } return visitor.Struct(field, results) @@ -189,7 +190,7 @@ func pruneParquetColumns(manifest *pqarrow.SchemaManifest, selected map[int]stru indices: []int{}, } - result, err := visitParquetManifest[arrow.Field](manifest, visitor) + result, err := visitParquetManifest(manifest, visitor) if err != nil { return nil, nil, err } @@ -335,7 +336,7 @@ func (p *pruneParquetSchema) List(field pqarrow.SchemaField, elemResult arrow.Fi panic(fmt.Errorf("cannot explicitly project list or map types")) } - p.indices = append(p.indices, field.ColIndex) + p.indices = append(p.indices, field.Children[0].ColIndex) return *field.Field } @@ -371,10 +372,12 @@ func (p *pruneParquetSchema) Map(field pqarrow.SchemaField, keyResult, valResult panic("cannot explicitly project list or map types") } + p.indices = append(p.indices, field.Children[0].Children[0].ColIndex) + p.indices = append(p.indices, field.Children[0].Children[1].ColIndex) return *field.Field } -func (p *pruneParquetSchema) Primitive(field pqarrow.SchemaField) arrow.Field { +func (p *pruneParquetSchema) Primitive(_ pqarrow.SchemaField) arrow.Field { return arrow.Field{} } diff --git a/table/scanner_test.go b/table/scanner_test.go index 761d941..a3ab031 100644 --- a/table/scanner_test.go +++ b/table/scanner_test.go @@ -458,6 +458,23 @@ func (s *ScannerSuite) TestPartitionedTables() { } } +func (s *ScannerSuite) TestNestedColumns() { + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer mem.AssertSize(s.T(), 0) + + ident := catalog.ToIdentifier("default", "test_all_types") + + tbl, err := s.cat.LoadTable(s.ctx, ident, s.props) + s.Require().NoError(err) + + ctx := compute.WithAllocator(s.ctx, mem) + results, err := tbl.Scan().ToArrowTable(ctx) + s.Require().NoError(err) + defer results.Release() + + s.EqualValues(5, results.NumRows()) +} + func (s *ScannerSuite) TestUnpartitionedUUIDTable() { mem := memory.NewCheckedAllocator(memory.DefaultAllocator) defer mem.AssertSize(s.T(), 0)