This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new c43f0ed  fix(table/scanner): Fix nested field scan (#311)
c43f0ed is described below

commit c43f0ed44b87845a9b47524790087146c355798b
Author: Matt Topol <zotthewiz...@gmail.com>
AuthorDate: Sat Mar 1 12:22:28 2025 -0500

    fix(table/scanner): Fix nested field scan (#311)
    
    Fixes #309
    
    There was a combination of factors that caused the initial problem:
    
    1. The arrow-go/v18/parquet/pqarrow library wasn't properly propagating
    `PARQUET:field_id` metadata for children of List or Map typed fields
    2. We only iterated the *fields* and skipped list/maptypes when
    selecting column indexes, this caused us to miss the children. Instead
    we need to iterate all of the *field IDs*, this change updates that.
    3. When pruning parquet fields we were not propagating the correct
    ColIndex for map typed columns, we want the leaves so we need the
    ColIndex of the children
    4. creating the output arrays during `ToRequestedSchema` led to a memory
    leak for list/map columns that needed to be fixed.
    
    A unit test has been added to ensure we are properly able to read the
    `test_all_types` table and get the rows without error.
---
 go.mod                          |  6 +++---
 go.sum                          |  8 ++++----
 io/s3.go                        |  1 +
 schema.go                       |  4 ++++
 table/arrow_scanner.go          | 18 ++++++++++-------
 table/arrow_utils.go            | 34 +++++++++++++++++++-------------
 table/arrow_utils_test.go       | 43 ++++++++++++++++++++++++++++++++++++++---
 table/internal/parquet_files.go | 11 +++++++----
 table/scanner_test.go           | 17 ++++++++++++++++
 9 files changed, 108 insertions(+), 34 deletions(-)

diff --git a/go.mod b/go.mod
index 4ae6456..0d0f6e4 100644
--- a/go.mod
+++ b/go.mod
@@ -22,7 +22,7 @@ go 1.23.0
 toolchain go1.23.6
 
 require (
-       github.com/apache/arrow-go/v18 v18.1.1-0.20250218215100-460f5004b92d
+       github.com/apache/arrow-go/v18 v18.1.1-0.20250226170053-efecae3596e6
        github.com/aws/aws-sdk-go-v2 v1.36.2
        github.com/aws/aws-sdk-go-v2/config v1.29.7
        github.com/aws/aws-sdk-go-v2/credentials v1.17.60
@@ -45,7 +45,6 @@ require (
        github.com/uptrace/bun/driver/sqliteshim v1.2.10
        github.com/uptrace/bun/extra/bundebug v1.2.10
        gocloud.dev v0.40.0
-       golang.org/x/exp v0.0.0-20250215185904-eff6e970281f
        golang.org/x/sync v0.11.0
        google.golang.org/api v0.222.0
        gopkg.in/yaml.v3 v3.0.1
@@ -103,7 +102,7 @@ require (
        github.com/jmespath/go-jmespath v0.4.0 // indirect
        github.com/json-iterator/go v1.1.12 // indirect
        github.com/klauspost/asmfmt v1.3.2 // indirect
-       github.com/klauspost/compress v1.17.11 // indirect
+       github.com/klauspost/compress v1.18.0 // indirect
        github.com/klauspost/cpuid/v2 v2.2.9 // indirect
        github.com/lithammer/fuzzysearch v1.1.8 // indirect
        github.com/mattn/go-colorable v0.1.14 // indirect
@@ -136,6 +135,7 @@ require (
        go.opentelemetry.io/otel/metric v1.34.0 // indirect
        go.opentelemetry.io/otel/trace v1.34.0 // indirect
        golang.org/x/crypto v0.33.0 // indirect
+       golang.org/x/exp v0.0.0-20250215185904-eff6e970281f // indirect
        golang.org/x/mod v0.23.0 // indirect
        golang.org/x/net v0.35.0 // indirect
        golang.org/x/oauth2 v0.26.0 // indirect
diff --git a/go.sum b/go.sum
index 8ec5666..b5df389 100644
--- a/go.sum
+++ b/go.sum
@@ -37,8 +37,8 @@ github.com/andybalholm/brotli v1.1.1 
h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7X
 github.com/andybalholm/brotli v1.1.1/go.mod 
h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA=
 github.com/antlr4-go/antlr/v4 v4.13.1 
h1:SqQKkuVZ+zWkMMNkjy5FZe5mr5WURWnlpmOuzYWrPrQ=
 github.com/antlr4-go/antlr/v4 v4.13.1/go.mod 
h1:GKmUxMtwp6ZgGwZSva4eWPC5mS6vUAmOABFgjdkM7Nw=
-github.com/apache/arrow-go/v18 v18.1.1-0.20250218215100-460f5004b92d 
h1:OL//u0ke+2Dld3s5szg3KC7ZgwaATfdmF5LyxpRxtC8=
-github.com/apache/arrow-go/v18 v18.1.1-0.20250218215100-460f5004b92d/go.mod 
h1:6vlLiEonoJLH6uX9X84ki6mZlOPjuVC4qoQkxsLzwd8=
+github.com/apache/arrow-go/v18 v18.1.1-0.20250226170053-efecae3596e6 
h1:RWgwATcu+8ZR65fmb/esaQkQqoCBhiRnB6WPMfAxJOs=
+github.com/apache/arrow-go/v18 v18.1.1-0.20250226170053-efecae3596e6/go.mod 
h1:ev6qDk3+bf7eh/goQbjA4ZViRDhu+9pmad8YTLvupIg=
 github.com/apache/thrift v0.21.0 
h1:tdPmh/ptjE1IJnhbhrcl2++TauVjy242rkV/UzJChnE=
 github.com/apache/thrift v0.21.0/go.mod 
h1:W1H8aR/QRtYNvrPeFXBtobyRkd0/YVhTc6i07XIAgDw=
 github.com/atomicgo/cursor v0.0.1/go.mod 
h1:cBON2QmmrysudxNBFthvMtN32r3jxVRIvzkUiF/RuIk=
@@ -188,8 +188,8 @@ github.com/json-iterator/go v1.1.12 
h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr
 github.com/json-iterator/go v1.1.12/go.mod 
h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
 github.com/klauspost/asmfmt v1.3.2 
h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4=
 github.com/klauspost/asmfmt v1.3.2/go.mod 
h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE=
-github.com/klauspost/compress v1.17.11 
h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
-github.com/klauspost/compress v1.17.11/go.mod 
h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
+github.com/klauspost/compress v1.18.0 
h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
+github.com/klauspost/compress v1.18.0/go.mod 
h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
 github.com/klauspost/cpuid/v2 v2.0.9/go.mod 
h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
 github.com/klauspost/cpuid/v2 v2.0.10/go.mod 
h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c=
 github.com/klauspost/cpuid/v2 v2.0.12/go.mod 
h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c=
diff --git a/io/s3.go b/io/s3.go
index 7a0791e..98ad4c6 100644
--- a/io/s3.go
+++ b/io/s3.go
@@ -138,6 +138,7 @@ func createS3Bucket(ctx context.Context, parsed *url.URL, 
props map[string]strin
                        o.BaseEndpoint = aws.String(endpoint)
                }
                o.UsePathStyle = usePathStyle
+               o.DisableLogOutputChecksumValidationSkipped = true
        })
 
        // Create a *blob.Bucket.
diff --git a/schema.go b/schema.go
index 04c29e7..50d347c 100644
--- a/schema.go
+++ b/schema.go
@@ -165,6 +165,10 @@ func (s *Schema) AsStruct() StructType    { return 
StructType{FieldList: s.field
 func (s *Schema) NumFields() int          { return len(s.fields) }
 func (s *Schema) Field(i int) NestedField { return s.fields[i] }
 func (s *Schema) Fields() []NestedField   { return slices.Clone(s.fields) }
+func (s *Schema) FieldIDs() []int {
+       idx, _ := s.lazyNameToID()
+       return slices.Collect(maps.Values(idx))
+}
 
 func (s *Schema) UnmarshalJSON(b []byte) error {
        type Alias Schema
diff --git a/table/arrow_scanner.go b/table/arrow_scanner.go
index f87514a..1a2cfcf 100644
--- a/table/arrow_scanner.go
+++ b/table/arrow_scanner.go
@@ -217,11 +217,12 @@ type arrowScan struct {
 
 func (as *arrowScan) projectedFieldIDs() (set[int], error) {
        idset := set[int]{}
-       for _, field := range as.projectedSchema.Fields() {
-               switch field.Type.(type) {
+       for _, id := range as.projectedSchema.FieldIDs() {
+               typ, _ := as.projectedSchema.FindTypeByID(id)
+               switch typ.(type) {
                case *iceberg.MapType, *iceberg.ListType:
                default:
-                       idset[field.ID] = struct{}{}
+                       idset[id] = struct{}{}
                }
        }
 
@@ -435,7 +436,7 @@ func (as *arrowScan) recordsFromTask(ctx context.Context, 
task internal.Enumerat
        return
 }
 
-func createIterator(ctx context.Context, numWorkers uint, records <-chan 
enumeratedRecord, deletesPerFile perFilePosDeletes, cancel context.CancelFunc, 
rowLimit int64) iter.Seq2[arrow.Record, error] {
+func createIterator(ctx context.Context, numWorkers uint, records <-chan 
enumeratedRecord, deletesPerFile perFilePosDeletes, cancel 
context.CancelCauseFunc, rowLimit int64) iter.Seq2[arrow.Record, error] {
        isBeforeAny := func(batch enumeratedRecord) bool {
                return batch.Task.Index < 0
        }
@@ -484,11 +485,14 @@ func createIterator(ctx context.Context, numWorkers uint, 
records <-chan enumera
                        }
                }()
 
-               defer cancel()
+               defer cancel(nil)
 
                for {
                        select {
                        case <-ctx.Done():
+                               if err := context.Cause(ctx); err != nil {
+                                       yield(nil, err)
+                               }
                                return
                        case enum, ok := <-sequenced:
                                if !ok {
@@ -531,7 +535,7 @@ func createIterator(ctx context.Context, numWorkers uint, 
records <-chan enumera
 func (as *arrowScan) recordBatchesFromTasksAndDeletes(ctx context.Context, 
tasks []FileScanTask, deletesPerFile perFilePosDeletes) iter.Seq2[arrow.Record, 
error] {
        extSet := substrait.NewExtensionSet()
 
-       ctx, cancel := context.WithCancel(exprs.WithExtensionIDSet(ctx, extSet))
+       ctx, cancel := context.WithCancelCause(exprs.WithExtensionIDSet(ctx, 
extSet))
        taskChan := make(chan internal.Enumerated[FileScanTask], len(tasks))
 
        // numWorkers := 1
@@ -554,7 +558,7 @@ func (as *arrowScan) recordBatchesFromTasksAndDeletes(ctx 
context.Context, tasks
 
                                        if err := as.recordsFromTask(ctx, task, 
records,
                                                
deletesPerFile[task.Value.File.FilePath()]); err != nil {
-                                               cancel()
+                                               cancel(err)
                                                return
                                        }
                                }
diff --git a/table/arrow_utils.go b/table/arrow_utils.go
index 2bfe881..490dac0 100644
--- a/table/arrow_utils.go
+++ b/table/arrow_utils.go
@@ -497,7 +497,10 @@ func (c convertToArrow) Field(field iceberg.NestedField, 
result arrow.Field) arr
 
 func (c convertToArrow) List(list iceberg.ListType, elemResult arrow.Field) 
arrow.Field {
        elemField := c.Field(list.ElementField(), elemResult)
-       return arrow.Field{Type: arrow.LargeListOfField(elemField)}
+       if c.useLargeTypes {
+               return arrow.Field{Type: arrow.LargeListOfField(elemField)}
+       }
+       return arrow.Field{Type: arrow.ListOfField(elemField)}
 }
 
 func (c convertToArrow) Map(m iceberg.MapType, keyResult, valResult 
arrow.Field) arrow.Field {
@@ -589,9 +592,9 @@ func SchemaToArrowSchema(sc *iceberg.Schema, metadata 
map[string]string, include
 // TypeToArrowType converts a given iceberg type, into the equivalent Arrow 
data type.
 // For dealing with nested fields (List, Struct, Map) if includeFieldIDs is 
true, then
 // the child fields will contain a metadata key PARQUET:field_id set to the 
field id.
-func TypeToArrowType(t iceberg.Type, includeFieldIDs bool) (arrow.DataType, 
error) {
+func TypeToArrowType(t iceberg.Type, includeFieldIDs bool, useLargeTypes bool) 
(arrow.DataType, error) {
        top, err := iceberg.Visit(iceberg.NewSchema(0, 
iceberg.NestedField{Type: t}),
-               convertToArrow{includeFieldIDs: includeFieldIDs})
+               convertToArrow{includeFieldIDs: includeFieldIDs, useLargeTypes: 
useLargeTypes})
        if err != nil {
                return nil, err
        }
@@ -677,7 +680,7 @@ func (a *arrowProjectionVisitor) castIfNeeded(field 
iceberg.NestedField, vals ar
 
        if !field.Type.Equals(typ) {
                promoted := retOrPanic(iceberg.PromoteType(fileField.Type, 
field.Type))
-               targetType := retOrPanic(TypeToArrowType(promoted, 
a.includeFieldIDs))
+               targetType := retOrPanic(TypeToArrowType(promoted, 
a.includeFieldIDs, a.useLargeTypes))
                if !a.useLargeTypes {
                        targetType = 
retOrPanic(ensureSmallArrowTypes(targetType))
                }
@@ -686,7 +689,7 @@ func (a *arrowProjectionVisitor) castIfNeeded(field 
iceberg.NestedField, vals ar
                        compute.SafeCastOptions(targetType)))
        }
 
-       targetType := retOrPanic(TypeToArrowType(field.Type, a.includeFieldIDs))
+       targetType := retOrPanic(TypeToArrowType(field.Type, a.includeFieldIDs, 
a.useLargeTypes))
        if !arrow.TypeEqual(targetType, vals.DataType()) {
                switch field.Type.(type) {
                case iceberg.TimestampType:
@@ -756,12 +759,16 @@ func (a *arrowProjectionVisitor) Struct(st 
iceberg.StructType, structArr arrow.A
        for i, field := range st.FieldList {
                arr := fieldResults[i]
                if arr != nil {
+                       if _, ok := arr.DataType().(arrow.NestedType); ok {
+                               defer arr.Release()
+                       }
+
                        arr = a.castIfNeeded(field, arr)
                        defer arr.Release()
                        fieldArrs[i] = arr
                        fields[i] = a.constructField(field, arr.DataType())
                } else if !field.Required {
-                       dt := retOrPanic(TypeToArrowType(field.Type, false))
+                       dt := retOrPanic(TypeToArrowType(field.Type, false, 
a.useLargeTypes))
 
                        arr = 
array.MakeArrayOfNull(compute.GetAllocator(a.ctx), dt, structArr.Len())
                        defer arr.Release()
@@ -786,11 +793,11 @@ func (a *arrowProjectionVisitor) List(listType 
iceberg.ListType, listArr arrow.A
                return nil
        }
 
-       valueArr := a.castIfNeeded(listType.ElementField(), valArr)
-       defer valueArr.Release()
+       valArr = a.castIfNeeded(listType.ElementField(), valArr)
+       defer valArr.Release()
 
        var outType arrow.ListLikeType
-       elemField := a.constructField(listType.ElementField(), 
valueArr.DataType())
+       elemField := a.constructField(listType.ElementField(), 
valArr.DataType())
        switch arr.DataType().ID() {
        case arrow.LIST:
                outType = arrow.ListOfField(elemField)
@@ -801,7 +808,7 @@ func (a *arrowProjectionVisitor) List(listType 
iceberg.ListType, listArr arrow.A
        }
 
        data := array.NewData(outType, arr.Len(), arr.Data().Buffers(),
-               []arrow.ArrayData{valueArr.Data()}, arr.NullN(), 
arr.Data().Offset())
+               []arrow.ArrayData{valArr.Data()}, arr.NullN(), 
arr.Data().Offset())
        defer data.Release()
        return array.MakeFromData(data)
 }
@@ -855,7 +862,8 @@ func ToRequestedSchema(ctx context.Context, requested, 
fileSchema *iceberg.Schem
        if err != nil {
                return nil, err
        }
-       defer result.Release()
-
-       return array.RecordFromStructArray(result.(*array.Struct), nil), nil
+       st.Release()
+       out := array.RecordFromStructArray(result.(*array.Struct), nil)
+       result.Release()
+       return out, nil
 }
diff --git a/table/arrow_utils_test.go b/table/arrow_utils_test.go
index 134d34d..058721d 100644
--- a/table/arrow_utils_test.go
+++ b/table/arrow_utils_test.go
@@ -18,6 +18,7 @@
 package table_test
 
 import (
+       "bufio"
        "context"
        "strings"
        "testing"
@@ -114,7 +115,7 @@ func TestArrowToIceberg(t *testing.T) {
                        ElementID:       1,
                        Element:         iceberg.PrimitiveTypes.Int32,
                        ElementRequired: true,
-               }, false, ""},
+               }, true, ""},
                {arrow.LargeListOfField(arrow.Field{
                        Name:     "element",
                        Type:     arrow.PrimitiveTypes.Int32,
@@ -124,7 +125,7 @@ func TestArrowToIceberg(t *testing.T) {
                        ElementID:       1,
                        Element:         iceberg.PrimitiveTypes.Int32,
                        ElementRequired: true,
-               }, true, ""},
+               }, false, ""},
                {arrow.FixedSizeListOfField(1, arrow.Field{
                        Name:     "element",
                        Type:     arrow.PrimitiveTypes.Int32,
@@ -164,7 +165,7 @@ func TestArrowToIceberg(t *testing.T) {
                        }
 
                        if tt.reciprocal {
-                               result, err := table.TypeToArrowType(tt.ice, 
true)
+                               result, err := table.TypeToArrowType(tt.ice, 
true, false)
                                require.NoError(t, err)
                                assert.True(t, arrow.TypeEqual(tt.dt, result), 
tt.dt.String(), result.String())
                        }
@@ -510,3 +511,39 @@ func TestToRequestedSchemaTimestamps(t *testing.T) {
                }
        }
 }
+
+func TestToRequestedSchema(t *testing.T) {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(t, 0)
+
+       schema := arrow.NewSchema([]arrow.Field{
+               {
+                       Name: "nested", Type: arrow.ListOfField(arrow.Field{
+                               Name: "element", Type: 
arrow.PrimitiveTypes.Int32, Nullable: false,
+                               Metadata: 
arrow.NewMetadata([]string{table.ArrowParquetFieldIDKey}, []string{"2"}),
+                       }),
+                       Metadata: 
arrow.NewMetadata([]string{table.ArrowParquetFieldIDKey}, []string{"1"})},
+       }, nil)
+
+       bldr := array.NewRecordBuilder(mem, schema)
+       defer bldr.Release()
+
+       const data = `{"nested": [1, 2, 3]}
+                                 {"nested": [4, 5, 6]}`
+
+       s := bufio.NewScanner(strings.NewReader(data))
+       require.True(t, s.Scan())
+       require.NoError(t, bldr.UnmarshalJSON(s.Bytes()))
+
+       rec := bldr.NewRecord()
+       defer rec.Release()
+
+       icesc, err := table.ArrowSchemaToIceberg(schema, false, nil)
+       require.NoError(t, err)
+
+       rec2, err := table.ToRequestedSchema(context.Background(), icesc, 
icesc, rec, true, true, false)
+       require.NoError(t, err)
+       defer rec2.Release()
+
+       assert.True(t, array.RecordEqual(rec, rec2))
+}
diff --git a/table/internal/parquet_files.go b/table/internal/parquet_files.go
index 1c5d38e..c128f5b 100644
--- a/table/internal/parquet_files.go
+++ b/table/internal/parquet_files.go
@@ -149,7 +149,8 @@ func visitParquetManifestStruct[T any](field 
pqarrow.SchemaField, visitor manife
        results := make([]T, len(field.Children))
 
        for i, f := range field.Children {
-               results[i] = visitManifestField(f, visitor)
+               res := visitManifestField(f, visitor)
+               results[i] = visitor.Field(f, res)
        }
 
        return visitor.Struct(field, results)
@@ -189,7 +190,7 @@ func pruneParquetColumns(manifest *pqarrow.SchemaManifest, 
selected map[int]stru
                indices:   []int{},
        }
 
-       result, err := visitParquetManifest[arrow.Field](manifest, visitor)
+       result, err := visitParquetManifest(manifest, visitor)
        if err != nil {
                return nil, nil, err
        }
@@ -335,7 +336,7 @@ func (p *pruneParquetSchema) List(field 
pqarrow.SchemaField, elemResult arrow.Fi
                panic(fmt.Errorf("cannot explicitly project list or map types"))
        }
 
-       p.indices = append(p.indices, field.ColIndex)
+       p.indices = append(p.indices, field.Children[0].ColIndex)
        return *field.Field
 }
 
@@ -371,10 +372,12 @@ func (p *pruneParquetSchema) Map(field 
pqarrow.SchemaField, keyResult, valResult
                panic("cannot explicitly project list or map types")
        }
 
+       p.indices = append(p.indices, field.Children[0].Children[0].ColIndex)
+       p.indices = append(p.indices, field.Children[0].Children[1].ColIndex)
        return *field.Field
 }
 
-func (p *pruneParquetSchema) Primitive(field pqarrow.SchemaField) arrow.Field {
+func (p *pruneParquetSchema) Primitive(_ pqarrow.SchemaField) arrow.Field {
        return arrow.Field{}
 }
 
diff --git a/table/scanner_test.go b/table/scanner_test.go
index 761d941..a3ab031 100644
--- a/table/scanner_test.go
+++ b/table/scanner_test.go
@@ -458,6 +458,23 @@ func (s *ScannerSuite) TestPartitionedTables() {
        }
 }
 
+func (s *ScannerSuite) TestNestedColumns() {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(s.T(), 0)
+
+       ident := catalog.ToIdentifier("default", "test_all_types")
+
+       tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
+       s.Require().NoError(err)
+
+       ctx := compute.WithAllocator(s.ctx, mem)
+       results, err := tbl.Scan().ToArrowTable(ctx)
+       s.Require().NoError(err)
+       defer results.Release()
+
+       s.EqualValues(5, results.NumRows())
+}
+
 func (s *ScannerSuite) TestUnpartitionedUUIDTable() {
        mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
        defer mem.AssertSize(s.T(), 0)

Reply via email to