This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 2d3b5b4d fix: support INT32/INT64 physical types for decimal columns 
(#686)
2d3b5b4d is described below

commit 2d3b5b4d48751ffe55993a76a4227a8fe9b9e949
Author: Karthic Rao <[email protected]>
AuthorDate: Thu Jan 22 22:53:08 2026 +0530

    fix: support INT32/INT64 physical types for decimal columns (#686)
    
    ## Summary
    
    Parquet decimals can be stored using multiple physical types depending
    on precision:
    - `INT32` for precision <= 9
    - `INT64` for precision <= 18
    - `FIXED_LEN_BYTE_ARRAY` for any precision
    - `BYTE_ARRAY` for any precision
    
    The previous implementation only accepted `FIXED_LEN_BYTE_ARRAY` for all
    decimals and rejected valid parquet files with error:
    
    ```
    unexpected physical type INT32 for decimal(7, 2), expected 
FIXED_LEN_BYTE_ARRAY
    ```
    
    This caused `AddFiles` to fail when importing datasets (like TPC-DS)
    that use INT32/INT64 for small precision decimals, which is valid per
    the Parquet specification.
    
    ## Changes
    
    - Refactors `createStatsAgg` to switch on Iceberg logical type first,
    then handle physical representations (matches iceberg-java's
    `ParquetConversions.java` approach)
    - For `DecimalType`, accepts all valid parquet physical types
    - Updates `DataFileStatsFromMeta` to handle INT32/INT64 decimal
    statistics
    - Adds `wrappedDecByteArrayStats` for BYTE_ARRAY encoded decimals
    
    ## Test plan
    
    - [x] Existing tests pass
    - [x] Build succeeds
    - [x] Tested with TPC-DS parquet files that use INT32 decimals
---
 table/internal/parquet_files.go      | 121 ++++++++++++++++++---------
 table/internal/parquet_files_test.go | 155 +++++++++++++++++++++++++++++++++++
 2 files changed, 235 insertions(+), 41 deletions(-)

diff --git a/table/internal/parquet_files.go b/table/internal/parquet_files.go
index 7ed2f309..8e7cf252 100644
--- a/table/internal/parquet_files.go
+++ b/table/internal/parquet_files.go
@@ -101,59 +101,66 @@ func (parquetFormat) PathToIDMapping(sc *iceberg.Schema) 
(map[string]int, error)
 }
 
 func (p parquetFormat) createStatsAgg(typ iceberg.PrimitiveType, 
physicalTypeStr string, truncLen int) (StatsAgg, error) {
-       expectedPhysical := p.PrimitiveTypeToPhysicalType(typ)
-       if physicalTypeStr != expectedPhysical {
-               switch {
-               case physicalTypeStr == "INT32" && expectedPhysical == "INT64":
-               case physicalTypeStr == "FLOAT" && expectedPhysical == "DOUBLE":
-               default:
-                       return nil, fmt.Errorf("unexpected physical type %s for 
%s, expected %s",
-                               physicalTypeStr, typ, expectedPhysical)
-               }
-       }
-
-       switch physicalTypeStr {
-       case "BOOLEAN":
+       // Switch on Iceberg logical type first, then handle physical 
representations.
+       // This matches iceberg-java's approach in ParquetConversions.java.
+       switch typ.(type) {
+       case iceberg.BooleanType:
                return newStatAgg[bool](typ, truncLen), nil
-       case "INT32":
-               switch typ.(type) {
-               case iceberg.DecimalType:
-                       return &decAsIntAgg[int32]{
-                               newStatAgg[int32](typ, 
truncLen).(*statsAggregator[int32]),
-                       }, nil
-               }
 
+       case iceberg.Int32Type, iceberg.DateType:
                return newStatAgg[int32](typ, truncLen), nil
-       case "INT64":
-               switch typ.(type) {
-               case iceberg.DecimalType:
-                       return &decAsIntAgg[int64]{
-                               newStatAgg[int64](typ, 
truncLen).(*statsAggregator[int64]),
-                       }, nil
+
+       case iceberg.Int64Type, iceberg.TimeType, iceberg.TimestampType, 
iceberg.TimestampTzType:
+               // Allow INT32 physical for INT64 logical (promotion)
+               if physicalTypeStr == "INT32" {
+                       return newStatAgg[int32](typ, truncLen), nil
                }
 
                return newStatAgg[int64](typ, truncLen), nil
-       case "FLOAT":
+
+       case iceberg.Float32Type:
                return newStatAgg[float32](typ, truncLen), nil
-       case "DOUBLE":
+
+       case iceberg.Float64Type:
+               // Allow FLOAT physical for DOUBLE logical (promotion)
+               if physicalTypeStr == "FLOAT" {
+                       return newStatAgg[float32](typ, truncLen), nil
+               }
+
                return newStatAgg[float64](typ, truncLen), nil
-       case "FIXED_LEN_BYTE_ARRAY":
-               switch typ.(type) {
-               case iceberg.UUIDType:
-                       return newStatAgg[uuid.UUID](typ, truncLen), nil
-               case iceberg.DecimalType:
+
+       case iceberg.StringType:
+               return newStatAgg[string](typ, truncLen), nil
+
+       case iceberg.BinaryType:
+               return newStatAgg[[]byte](typ, truncLen), nil
+
+       case iceberg.UUIDType:
+               return newStatAgg[uuid.UUID](typ, truncLen), nil
+
+       case iceberg.FixedType:
+               return newStatAgg[[]byte](typ, truncLen), nil
+
+       case iceberg.DecimalType:
+               // Decimals can be stored as INT32 (precision <= 9), INT64 
(precision <= 18),
+               // FIXED_LEN_BYTE_ARRAY, or BYTE_ARRAY per Parquet spec.
+               switch physicalTypeStr {
+               case "INT32":
+                       return &decAsIntAgg[int32]{
+                               newStatAgg[int32](typ, 
truncLen).(*statsAggregator[int32]),
+                       }, nil
+               case "INT64":
+                       return &decAsIntAgg[int64]{
+                               newStatAgg[int64](typ, 
truncLen).(*statsAggregator[int64]),
+                       }, nil
+               case "FIXED_LEN_BYTE_ARRAY", "BYTE_ARRAY":
                        return newStatAgg[iceberg.Decimal](typ, truncLen), nil
                default:
-                       return newStatAgg[[]byte](typ, truncLen), nil
-               }
-       case "BYTE_ARRAY":
-               if typ.Equals(iceberg.PrimitiveTypes.String) {
-                       return newStatAgg[string](typ, truncLen), nil
+                       return nil, fmt.Errorf("unsupported physical type %s 
for decimal", physicalTypeStr)
                }
 
-               return newStatAgg[[]byte](typ, truncLen), nil
        default:
-               return nil, fmt.Errorf("unsupported physical type: %s", 
physicalTypeStr)
+               return nil, fmt.Errorf("unsupported iceberg type: %s", typ)
        }
 }
 
@@ -400,6 +407,29 @@ func (w wrappedDecStats) Max() iceberg.Decimal {
        return iceberg.Decimal{Val: dec, Scale: w.scale}
 }
 
+type wrappedDecByteArrayStats struct {
+       *metadata.ByteArrayStatistics
+       scale int
+}
+
+func (w wrappedDecByteArrayStats) Min() iceberg.Decimal {
+       dec, err := BigEndianToDecimal(w.ByteArrayStatistics.Min())
+       if err != nil {
+               panic(err)
+       }
+
+       return iceberg.Decimal{Val: dec, Scale: w.scale}
+}
+
+func (w wrappedDecByteArrayStats) Max() iceberg.Decimal {
+       dec, err := BigEndianToDecimal(w.ByteArrayStatistics.Max())
+       if err != nil {
+               panic(err)
+       }
+
+       return iceberg.Decimal{Val: dec, Scale: w.scale}
+}
+
 func (p parquetFormat) DataFileStatsFromMeta(meta Metadata, statsCols 
map[int]StatisticsCollector, colMapping map[string]int) *DataFileStatistics {
        pqmeta := meta.(*metadata.FileMetaData)
        var (
@@ -487,7 +517,16 @@ func (p parquetFormat) DataFileStatsFromMeta(meta 
Metadata, statsCols map[int]St
                        case iceberg.FixedType:
                                stats = 
&wrappedFLBAStats{stats.(*metadata.FixedLenByteArrayStatistics)}
                        case iceberg.DecimalType:
-                               stats = 
&wrappedDecStats{stats.(*metadata.FixedLenByteArrayStatistics), t.Scale()}
+                               // Decimals can be stored as INT32/INT64 (small 
precision) or FIXED_LEN_BYTE_ARRAY/BYTE_ARRAY.
+                               // Only wrap FIXED_LEN_BYTE_ARRAY and 
BYTE_ARRAY statistics; INT32/INT64 stats
+                               // are used directly by decAsIntAgg.
+                               switch s := stats.(type) {
+                               case *metadata.FixedLenByteArrayStatistics:
+                                       stats = &wrappedDecStats{s, t.Scale()}
+                               case *metadata.ByteArrayStatistics:
+                                       stats = &wrappedDecByteArrayStats{s, 
t.Scale()}
+                                       // INT32/INT64 statistics are used 
directly by decAsIntAgg
+                               }
                        }
 
                        agg.Update(stats)
diff --git a/table/internal/parquet_files_test.go 
b/table/internal/parquet_files_test.go
index 47772cc8..85480da0 100644
--- a/table/internal/parquet_files_test.go
+++ b/table/internal/parquet_files_test.go
@@ -20,6 +20,7 @@ package internal_test
 import (
        "bytes"
        "context"
+       "fmt"
        "math/big"
        "strings"
        "testing"
@@ -33,6 +34,7 @@ import (
        "github.com/apache/arrow-go/v18/parquet/file"
        "github.com/apache/arrow-go/v18/parquet/metadata"
        "github.com/apache/arrow-go/v18/parquet/pqarrow"
+       "github.com/apache/arrow-go/v18/parquet/schema"
        "github.com/apache/iceberg-go"
        internal2 "github.com/apache/iceberg-go/internal"
        "github.com/apache/iceberg-go/table"
@@ -330,6 +332,159 @@ func TestMetricsPrimitiveTypes(t *testing.T) {
        })
 }
 
+// TestDecimalPhysicalTypes tests that decimals stored as INT32/INT64 physical 
types
+// are correctly handled. This is important because Parquet allows decimals 
with
+// precision <= 9 to be stored as INT32, and precision <= 18 as INT64.
+func TestDecimalPhysicalTypes(t *testing.T) {
+       format := internal.GetFileFormat(iceberg.ParquetFile)
+
+       tests := []struct {
+               name         string
+               precision    int
+               scale        int
+               physicalType parquet.Type
+               values       []int64 // unscaled values
+               expectedMin  int64
+               expectedMax  int64
+       }{
+               {
+                       name:         "decimal_as_int32",
+                       precision:    7,
+                       scale:        2,
+                       physicalType: parquet.Types.Int32,
+                       values:       []int64{12345, 67890}, // represents 
123.45, 678.90
+                       expectedMin:  12345,
+                       expectedMax:  67890,
+               },
+               {
+                       name:         "decimal_as_int64",
+                       precision:    15,
+                       scale:        2,
+                       physicalType: parquet.Types.Int64,
+                       values:       []int64{123456789012345, 987654321098765},
+                       expectedMin:  123456789012345,
+                       expectedMax:  987654321098765,
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       // Create a Parquet file with decimal stored as INT32 
or INT64
+                       var buf bytes.Buffer
+
+                       // Build a custom schema with decimal logical type
+                       decType := 
schema.NewDecimalLogicalType(int32(tt.precision), int32(tt.scale))
+                       var node schema.Node
+                       var err error
+                       if tt.physicalType == parquet.Types.Int32 {
+                               node, err = 
schema.NewPrimitiveNodeLogical("value", parquet.Repetitions.Required,
+                                       decType, parquet.Types.Int32, 0, 1)
+                       } else {
+                               node, err = 
schema.NewPrimitiveNodeLogical("value", parquet.Repetitions.Required,
+                                       decType, parquet.Types.Int64, 0, 1)
+                       }
+                       require.NoError(t, err)
+
+                       rootNode, err := schema.NewGroupNode("schema", 
parquet.Repetitions.Required, schema.FieldList{node}, -1)
+                       require.NoError(t, err)
+
+                       // Write the parquet file
+                       writer := file.NewParquetWriter(&buf,
+                               rootNode,
+                               
file.WithWriterProps(parquet.NewWriterProperties(parquet.WithStats(true))))
+
+                       rgw := writer.AppendRowGroup()
+                       colWriter, err := rgw.NextColumn()
+                       require.NoError(t, err)
+
+                       if tt.physicalType == parquet.Types.Int32 {
+                               int32Writer := 
colWriter.(*file.Int32ColumnChunkWriter)
+                               vals := make([]int32, len(tt.values))
+                               for i, v := range tt.values {
+                                       vals[i] = int32(v)
+                               }
+                               _, err = int32Writer.WriteBatch(vals, nil, nil)
+                       } else {
+                               int64Writer := 
colWriter.(*file.Int64ColumnChunkWriter)
+                               _, err = int64Writer.WriteBatch(tt.values, nil, 
nil)
+                       }
+                       require.NoError(t, err)
+
+                       require.NoError(t, colWriter.Close())
+                       require.NoError(t, rgw.Close())
+                       require.NoError(t, writer.Close())
+
+                       // Read back and get metadata
+                       rdr, err := 
file.NewParquetReader(bytes.NewReader(buf.Bytes()))
+                       require.NoError(t, err)
+                       defer rdr.Close()
+
+                       meta := rdr.MetaData()
+
+                       // Create table metadata with decimal type
+                       tableMeta, err := 
table.ParseMetadataString(fmt.Sprintf(`{
+                               "format-version": 2,
+                               "location": "s3://bucket/test/location",
+                               "last-column-id": 1,
+                               "current-schema-id": 0,
+                               "schemas": [
+                                       {
+                                               "type": "struct",
+                                               "schema-id": 0,
+                                               "fields": [
+                                                       {"id": 1, "name": 
"value", "required": true, "type": "decimal(%d, %d)"}
+                                               ]
+                                       }
+                               ],
+                               "last-partition-id": 0,
+                               "last-updated-ms": -1,
+                               "default-spec-id": 0,
+                               "default-sort-order-id": 0,
+                               "sort-orders": [{"order-id": 0, "fields": []}],
+                               "partition-specs": [{"spec-id": 0, "fields": 
[]}],
+                               "properties": {}
+                       }`, tt.precision, tt.scale))
+                       require.NoError(t, err)
+
+                       mapping, err := 
format.PathToIDMapping(tableMeta.CurrentSchema())
+                       require.NoError(t, err)
+
+                       collector := map[int]internal.StatisticsCollector{
+                               1: {
+                                       FieldID:    1,
+                                       Mode:       internal.MetricsMode{Typ: 
internal.MetricModeFull},
+                                       ColName:    "value",
+                                       IcebergTyp: 
iceberg.DecimalTypeOf(tt.precision, tt.scale),
+                               },
+                       }
+
+                       // This should not panic - the fix allows INT32/INT64 
physical types for decimals
+                       stats := 
format.DataFileStatsFromMeta(internal.Metadata(meta), collector, mapping)
+                       require.NotNil(t, stats)
+
+                       df := stats.ToDataFile(tableMeta.CurrentSchema(), 
tableMeta.PartitionSpec(), "test.parquet",
+                               iceberg.ParquetFile, meta.GetSourceFileSize(), 
nil)
+
+                       // Verify bounds are correctly extracted
+                       require.Contains(t, df.LowerBoundValues(), 1)
+                       require.Contains(t, df.UpperBoundValues(), 1)
+
+                       // Verify the actual values
+                       minLit, err := 
iceberg.LiteralFromBytes(iceberg.DecimalTypeOf(tt.precision, tt.scale), 
df.LowerBoundValues()[1])
+                       require.NoError(t, err)
+                       minDec := 
minLit.(iceberg.TypedLiteral[iceberg.Decimal]).Value()
+                       assert.Equal(t, uint64(tt.expectedMin), 
minDec.Val.LowBits())
+                       assert.Equal(t, tt.scale, minDec.Scale)
+
+                       maxLit, err := 
iceberg.LiteralFromBytes(iceberg.DecimalTypeOf(tt.precision, tt.scale), 
df.UpperBoundValues()[1])
+                       require.NoError(t, err)
+                       maxDec := 
maxLit.(iceberg.TypedLiteral[iceberg.Decimal]).Value()
+                       assert.Equal(t, uint64(tt.expectedMax), 
maxDec.Val.LowBits())
+                       assert.Equal(t, tt.scale, maxDec.Scale)
+               })
+       }
+}
+
 func TestWriteDataFileErrOnClose(t *testing.T) {
        ctx := context.Background()
        fm := internal.GetFileFormat(iceberg.ParquetFile)

Reply via email to