This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 07a6a6ac perf(table): optimize partitioned write throughput (#622)
07a6a6ac is described below

commit 07a6a6acee26e47e17d48c97a803e852387a3a89
Author: Alex <[email protected]>
AuthorDate: Mon Nov 17 11:42:47 2025 -0700

    perf(table): optimize partitioned write throughput (#622)
    
    # Partitioned Write Optimizations
    
    ## Summary
    
    This PR delivers significant performance improvements to the partitioned
    write throughput in the Iceberg table writer. Through a series of
    iterative optimizations, we achieved substantial gains in write
    performance, reduced memory allocations, and improved overall
    efficiency.
    
    ## Performance Results
    
    Note: the following results are using the latest commit on `main` of
    arrow-go
    
(https://github.com/apache/arrow-go/commit/3160eef9c227d94db67bfaf5225a2d6c1f48bc76)
    
    The following benchmarks were conducted on Apple M3 Max (darwin/arm64)
    for the new `BenchmarkPartitionedWriteThroughput` test with 2.5M rows
    per write operation:
    
    ### Incremental Improvements
    
    | Change | Time/op | Δ Time | records/sec | Δ records/sec | allocs/op |
    Δ allocs/op |
    
    
|--------|---------|---------|-------------|---------------|-----------|-------------|
    | Base | 2.35 s | - | 1,065,115 | - | 60,076,290 | - |
    | Change 1 | 1.49 s | -36.5% | 1,677,802 | +57.5% | 35,076,376 | -41.6%
    |
    | Change 2 | 1.21 s | -18.7% | 2,064,629 | +23.1% | 25,076,562 | -28.5%
    |
    | Change 3 | 1.16 s | -4.5% | 2,161,545 | +4.7% | 22,576,588 | -10.0% |
    | Change 4 | 1.07 s | -7.4% | 2,334,700 | +8.0% | 20,076,480 | -11.1% |
    | Change 5 | 654.1 ms | -38.9% | 3,821,892 | +63.7% | 12,577,119 |
    -37.4% |
    
    ### Overall Improvement (Base → Change 5)
    
    **2.5M Records per write:**
    
    | Metric | Base | Change 5 | Improvement |
    |--------|------|----------|-------------|
    | **Time/op** | 2.35 s | 654.1 ms | **-72.1%** ⚡ |
    | **records/sec** | 1,065,115 | 3,821,892 | **+258.9%** 🚀 |
    | **allocs/op** | 60,076,290 | 12,577,119 | **-79.1%** 💪 |
    
    **100K Records per write:**
    
    | Metric | Base | Change 5 | Improvement |
    |--------|------|----------|-------------|
    | **Time/op** | 221.3 ms | 137.7 ms | **-37.8%** |
    | **records/sec** | 451,853 | 726,085 | **+60.7%** |
    | **allocs/op** | 2,472,792 | 573,012 | **-76.8%** |
    
    ## Impact
    
    These optimizations significantly improve the performance of partitioned
    writes in Iceberg tables, making the library more efficient for
    high-throughput data ingestion scenarios. The improvements scale well
    with larger workloads, as demonstrated by the dramatic gains in the 2.5M
    record benchmark results.
    
    ## Testing
    
    All optimizations were validated through comprehensive benchmarking on
    darwin/arm64 (Apple M3 Max). The improvements are consistent across
    different record volumes (100K, 500K, and 2.5M records).
---
 go.mod                                     |   2 +-
 go.sum                                     |   4 +-
 partitions.go                              |  78 ++++++++--
 partitions_bench_test.go                   | 222 +++++++++++++++++++++++++++++
 partitions_test.go                         |   4 +-
 table/partitioned_fanout_writer.go         | 184 ++++++++++++++++++------
 table/partitioned_throughput_bench_test.go | 183 ++++++++++++++++++++++++
 transforms.go                              |   2 +
 8 files changed, 620 insertions(+), 59 deletions(-)

diff --git a/go.mod b/go.mod
index c30b4a3e..5fb67087 100644
--- a/go.mod
+++ b/go.mod
@@ -23,7 +23,7 @@ require (
        cloud.google.com/go/storage v1.57.2
        github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.13.1
        github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.3
-       github.com/apache/arrow-go/v18 v18.4.2-0.20251022172928-a489ef0147d6
+       github.com/apache/arrow-go/v18 v18.4.2-0.20251108194323-3160eef9c227
        github.com/aws/aws-sdk-go-v2 v1.39.6
        github.com/aws/aws-sdk-go-v2/config v1.31.20
        github.com/aws/aws-sdk-go-v2/credentials v1.18.24
diff --git a/go.sum b/go.sum
index 552f2f07..992a6cf3 100644
--- a/go.sum
+++ b/go.sum
@@ -95,8 +95,8 @@ github.com/andybalholm/brotli v1.2.0 
h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwTo
 github.com/andybalholm/brotli v1.2.0/go.mod 
h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY=
 github.com/antlr4-go/antlr/v4 v4.13.1 
h1:SqQKkuVZ+zWkMMNkjy5FZe5mr5WURWnlpmOuzYWrPrQ=
 github.com/antlr4-go/antlr/v4 v4.13.1/go.mod 
h1:GKmUxMtwp6ZgGwZSva4eWPC5mS6vUAmOABFgjdkM7Nw=
-github.com/apache/arrow-go/v18 v18.4.2-0.20251022172928-a489ef0147d6 
h1:kqqktu1fXE+DUjlXntoYe1wATKo/4CoMQ805dDX4Zpo=
-github.com/apache/arrow-go/v18 v18.4.2-0.20251022172928-a489ef0147d6/go.mod 
h1:YfS5yWtFXamxhBvVOKgjrrLQ7YV86vn2RCCgrKdgfbU=
+github.com/apache/arrow-go/v18 v18.4.2-0.20251108194323-3160eef9c227 
h1:hiRpYiLRp3c69BsnjgE2dAGdx6sQyfw6Btvto1Z+t1s=
+github.com/apache/arrow-go/v18 v18.4.2-0.20251108194323-3160eef9c227/go.mod 
h1:YfS5yWtFXamxhBvVOKgjrrLQ7YV86vn2RCCgrKdgfbU=
 github.com/apache/thrift v0.22.0 
h1:r7mTJdj51TMDe6RtcmNdQxgn9XcyfGDOzegMDRg47uc=
 github.com/apache/thrift v0.22.0/go.mod 
h1:1e7J/O1Ae6ZQMTYdy9xa3w9k+XHWPfRvdPyJeynQ+/g=
 github.com/apparentlymart/go-textseg/v15 v15.0.0 
h1:uYvfpb3DyLSCGWnctWKGj857c6ew1u1fNQOlOtuGxQY=
diff --git a/partitions.go b/partitions.go
index 598adf1e..c3f4d2e3 100644
--- a/partitions.go
+++ b/partitions.go
@@ -23,7 +23,6 @@ import (
        "fmt"
        "iter"
        "net/url"
-       "path"
        "slices"
        "strings"
 )
@@ -50,6 +49,26 @@ type PartitionField struct {
        Name string `json:"name"`
        // Transform is the transform used to produce the partition value
        Transform Transform `json:"transform"`
+
+       // escapedName is a cached URL-escaped version of Name for performance
+       // This is populated during initialization and not serialized
+       escapedName string
+}
+
+// EscapedName returns the URL-escaped version of the partition field name.
+func (p *PartitionField) EscapedName() string {
+       if p.escapedName == "" {
+               p.escapedName = url.QueryEscape(p.Name)
+       }
+
+       return p.escapedName
+}
+
+func (p PartitionField) Equals(other PartitionField) bool {
+       return p.SourceID == other.SourceID &&
+               p.FieldID == other.FieldID &&
+               p.Name == other.Name &&
+               p.Transform.Equals(other.Transform)
 }
 
 func (p *PartitionField) String() string {
@@ -85,6 +104,11 @@ type PartitionSpec struct {
 
        // this is populated by initialize after creation
        sourceIdToFields map[int][]PartitionField
+
+       // partitionTypeCache caches the result of PartitionType for a given 
schema ID
+       // Key is schema ID, value is the cached StructType
+       // This avoids rebuilding the partition type on every row during writes
+       partitionTypeCache map[int]*StructType
 }
 
 type PartitionOption func(*PartitionSpec) error
@@ -344,8 +368,10 @@ func (ps *PartitionSpec) UnmarshalJSON(b []byte) error {
 
 func (ps *PartitionSpec) initialize() {
        ps.sourceIdToFields = make(map[int][]PartitionField)
-       for _, f := range ps.fields {
-               ps.sourceIdToFields[f.SourceID] = 
append(ps.sourceIdToFields[f.SourceID], f)
+       ps.partitionTypeCache = make(map[int]*StructType)
+
+       for i := range ps.fields {
+               ps.sourceIdToFields[ps.fields[i].SourceID] = 
append(ps.sourceIdToFields[ps.fields[i].SourceID], ps.fields[i])
        }
 }
 
@@ -419,6 +445,18 @@ func (ps *PartitionSpec) LastAssignedFieldID() int {
 // and only parittion spec that uses a required source column will never be
 // null, but it doesn't seem worth tracking this case.
 func (ps *PartitionSpec) PartitionType(schema *Schema) *StructType {
+       // Initialize cache if needed (for PartitionSpecs created without 
NewPartitionSpecOpts)
+       if ps.partitionTypeCache == nil {
+               ps.partitionTypeCache = make(map[int]*StructType)
+       }
+
+       // Check cache first using schema ID as key
+       schemaID := schema.ID
+       if cached, ok := ps.partitionTypeCache[schemaID]; ok {
+               return cached
+       }
+
+       // Build partition type if not cached
        nestedFields := []NestedField{}
        for _, field := range ps.fields {
                sourceType, ok := schema.FindTypeByID(field.SourceID)
@@ -434,7 +472,10 @@ func (ps *PartitionSpec) PartitionType(schema *Schema) 
*StructType {
                })
        }
 
-       return &StructType{FieldList: nestedFields}
+       result := &StructType{FieldList: nestedFields}
+       ps.partitionTypeCache[schemaID] = result
+
+       return result
 }
 
 // PartitionToPath produces a proper partition path from the data and schema by
@@ -447,15 +488,34 @@ func (ps *PartitionSpec) PartitionType(schema *Schema) 
*StructType {
 func (ps *PartitionSpec) PartitionToPath(data structLike, sc *Schema) string {
        partType := ps.PartitionType(sc)
 
-       segments := make([]string, 0, len(partType.FieldList))
+       if len(partType.FieldList) == 0 {
+               return ""
+       }
+
+       // Use strings.Builder for efficient string concatenation
+       // Estimate capacity: escaped_name + "=" + escaped_value + "/" per field
+       var sb strings.Builder
+       estimatedSize := 0
        for i := range partType.Fields() {
-               valueStr := ps.fields[i].Transform.ToHumanStr(data.Get(i))
+               estimatedSize += len(ps.fields[i].EscapedName()) + 20 // name + 
"=" + avg value + "/"
+       }
+       sb.Grow(estimatedSize)
+
+       for i := range partType.Fields() {
+               if i > 0 {
+                       sb.WriteByte('/')
+               }
+
+               // Use pre-escaped field name (now guaranteed to be initialized)
+               sb.WriteString(ps.fields[i].EscapedName())
+               sb.WriteByte('=')
 
-               segments = append(segments, fmt.Sprintf("%s=%s",
-                       url.QueryEscape(ps.fields[i].Name), 
url.QueryEscape(valueStr)))
+               // Only escape the value (which changes per row)
+               valueStr := ps.fields[i].Transform.ToHumanStr(data.Get(i))
+               sb.WriteString(url.QueryEscape(valueStr))
        }
 
-       return path.Join(segments...)
+       return sb.String()
 }
 
 // GeneratePartitionFieldName returns default partition field name based on 
field transform type
diff --git a/partitions_bench_test.go b/partitions_bench_test.go
new file mode 100644
index 00000000..e170b981
--- /dev/null
+++ b/partitions_bench_test.go
@@ -0,0 +1,222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package iceberg_test
+
+import (
+       "testing"
+
+       "github.com/apache/iceberg-go"
+)
+
+// BenchmarkPartitionToPath benchmarks the optimized PartitionToPath function
+// which uses cached URL-escaped field names and strings.Builder for efficient
+// string concatenation.
+func BenchmarkPartitionToPath(b *testing.B) {
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "str", Type: 
iceberg.PrimitiveTypes.String},
+               iceberg.NestedField{ID: 2, Name: "other_str", Type: 
iceberg.PrimitiveTypes.String},
+               iceberg.NestedField{ID: 3, Name: "int", Type: 
iceberg.PrimitiveTypes.Int32, Required: true},
+               iceberg.NestedField{ID: 4, Name: "date", Type: 
iceberg.PrimitiveTypes.Date, Required: true},
+               iceberg.NestedField{ID: 5, Name: "ts", Type: 
iceberg.PrimitiveTypes.Timestamp, Required: true},
+       )
+
+       // Create a partition spec with fields that need URL escaping
+       spec := iceberg.NewPartitionSpecID(1,
+               iceberg.PartitionField{
+                       SourceID: 1, FieldID: 1000,
+                       Transform: iceberg.TruncateTransform{Width: 19}, Name: 
"my#str%bucket",
+               },
+               iceberg.PartitionField{
+                       SourceID: 2, FieldID: 1001,
+                       Transform: iceberg.IdentityTransform{}, Name: "other 
str+bucket",
+               },
+               iceberg.PartitionField{
+                       SourceID: 3, FieldID: 1002,
+                       Transform: iceberg.BucketTransform{NumBuckets: 25}, 
Name: "my!int:bucket",
+               },
+               iceberg.PartitionField{
+                       SourceID: 4, FieldID: 1003,
+                       Transform: iceberg.DayTransform{}, Name: "date/day",
+               },
+               iceberg.PartitionField{
+                       SourceID: 5, FieldID: 1004,
+                       Transform: iceberg.HourTransform{}, Name: "ts/hour",
+               },
+       )
+
+       record := partitionRecord{"my+str", "( )", int32(10), int32(18993), 
int64(1672531200000000)}
+
+       b.ResetTimer()
+       b.ReportAllocs()
+       for i := 0; i < b.N; i++ {
+               _ = spec.PartitionToPath(record, schema)
+       }
+}
+
+// BenchmarkPartitionToPathManyFields benchmarks PartitionToPath with many 
partition fields
+// to verify performance scales well with the number of fields.
+func BenchmarkPartitionToPathManyFields(b *testing.B) {
+       // Create a schema with many fields
+       fields := make([]iceberg.NestedField, 0, 20)
+       partitionFields := make([]iceberg.PartitionField, 0, 20)
+       recordValues := make([]any, 0, 20)
+
+       for i := 1; i <= 20; i++ {
+               fields = append(fields, iceberg.NestedField{
+                       ID: i, Name: "field_" + string(rune('a'+i-1)),
+                       Type: iceberg.PrimitiveTypes.String, Required: false,
+               })
+               partitionFields = append(partitionFields, 
iceberg.PartitionField{
+                       SourceID: i, FieldID: 1000 + i,
+                       Transform: iceberg.IdentityTransform{}, Name: 
"part_field_" + string(rune('a'+i-1)),
+               })
+               recordValues = append(recordValues, 
"value_"+string(rune('a'+i-1)))
+       }
+
+       schema := iceberg.NewSchema(0, fields...)
+       spec := iceberg.NewPartitionSpecID(1, partitionFields...)
+       record := partitionRecord(recordValues)
+
+       b.ResetTimer()
+       b.ReportAllocs()
+       for i := 0; i < b.N; i++ {
+               _ = spec.PartitionToPath(record, schema)
+       }
+}
+
+// BenchmarkPartitionType benchmarks the PartitionType function with caching.
+// The first call should be slower (builds the type), subsequent calls should
+// be faster (uses cache).
+func BenchmarkPartitionType(b *testing.B) {
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "str", Type: 
iceberg.PrimitiveTypes.String},
+               iceberg.NestedField{ID: 2, Name: "int", Type: 
iceberg.PrimitiveTypes.Int32, Required: true},
+               iceberg.NestedField{ID: 3, Name: "bool", Type: 
iceberg.PrimitiveTypes.Bool, Required: false},
+       )
+
+       spec := iceberg.NewPartitionSpecID(1,
+               iceberg.PartitionField{
+                       SourceID: 1, FieldID: 1000,
+                       Transform: iceberg.TruncateTransform{Width: 19}, Name: 
"str_truncate",
+               },
+               iceberg.PartitionField{
+                       SourceID: 2, FieldID: 1001,
+                       Transform: iceberg.BucketTransform{NumBuckets: 25}, 
Name: "int_bucket",
+               },
+               iceberg.PartitionField{
+                       SourceID: 3, FieldID: 1002,
+                       Transform: iceberg.IdentityTransform{}, Name: 
"bool_identity",
+               },
+       )
+
+       b.ResetTimer()
+       b.ReportAllocs()
+       for i := 0; i < b.N; i++ {
+               _ = spec.PartitionType(schema)
+       }
+}
+
+// BenchmarkPartitionTypeMultipleSchemas benchmarks PartitionType with multiple
+// different schemas to verify caching works correctly per schema ID.
+func BenchmarkPartitionTypeMultipleSchemas(b *testing.B) {
+       schemas := make([]*iceberg.Schema, 10)
+       for i := range schemas {
+               schemas[i] = iceberg.NewSchema(i,
+                       iceberg.NestedField{ID: 1, Name: "str", Type: 
iceberg.PrimitiveTypes.String},
+                       iceberg.NestedField{ID: 2, Name: "int", Type: 
iceberg.PrimitiveTypes.Int32, Required: true},
+               )
+       }
+
+       spec := iceberg.NewPartitionSpecID(1,
+               iceberg.PartitionField{
+                       SourceID: 1, FieldID: 1000,
+                       Transform: iceberg.IdentityTransform{}, Name: 
"str_identity",
+               },
+               iceberg.PartitionField{
+                       SourceID: 2, FieldID: 1001,
+                       Transform: iceberg.BucketTransform{NumBuckets: 10}, 
Name: "int_bucket",
+               },
+       )
+
+       b.ResetTimer()
+       b.ReportAllocs()
+       for i := 0; i < b.N; i++ {
+               schema := schemas[i%len(schemas)]
+               _ = spec.PartitionType(schema)
+       }
+}
+
+// BenchmarkPartitionToPathRepeated benchmarks repeated calls to 
PartitionToPath
+// with the same spec to verify that cached escaped names provide performance 
benefits.
+func BenchmarkPartitionToPathRepeated(b *testing.B) {
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "str", Type: 
iceberg.PrimitiveTypes.String},
+               iceberg.NestedField{ID: 2, Name: "other_str", Type: 
iceberg.PrimitiveTypes.String},
+               iceberg.NestedField{ID: 3, Name: "int", Type: 
iceberg.PrimitiveTypes.Int32, Required: true},
+       )
+
+       // Use field names that require URL escaping to maximize the benefit of 
caching
+       spec := iceberg.NewPartitionSpecID(1,
+               iceberg.PartitionField{
+                       SourceID: 1, FieldID: 1000,
+                       Transform: iceberg.TruncateTransform{Width: 19}, Name: 
"my#str%bucket",
+               },
+               iceberg.PartitionField{
+                       SourceID: 2, FieldID: 1001,
+                       Transform: iceberg.IdentityTransform{}, Name: "other 
str+bucket",
+               },
+               iceberg.PartitionField{
+                       SourceID: 3, FieldID: 1002,
+                       Transform: iceberg.BucketTransform{NumBuckets: 25}, 
Name: "my!int:bucket",
+               },
+       )
+
+       // Create multiple records with different values
+       records := []partitionRecord{
+               {"my+str", "( )", int32(10)},
+               {"another/value", "test data", int32(20)},
+               {"third&record", "more data", int32(30)},
+               {"fourth=record", "even more", int32(40)},
+               {"fifth?record", "last one", int32(50)},
+       }
+
+       b.ResetTimer()
+       b.ReportAllocs()
+       for i := 0; i < b.N; i++ {
+               record := records[i%len(records)]
+               _ = spec.PartitionToPath(record, schema)
+       }
+}
+
+// BenchmarkPartitionSpecInitialize benchmarks the initialization of a 
partition spec
+// which now pre-computes URL-escaped field names.
+func BenchmarkPartitionSpecInitialize(b *testing.B) {
+       fields := make([]iceberg.PartitionField, 10)
+       for i := range fields {
+               fields[i] = iceberg.PartitionField{
+                       SourceID: i + 1, FieldID: 1000 + i,
+                       Transform: iceberg.IdentityTransform{}, Name: 
"field#with%special&chars=" + string(rune('a'+i)),
+               }
+       }
+
+       b.ResetTimer()
+       b.ReportAllocs()
+       for i := 0; i < b.N; i++ {
+               _ = iceberg.NewPartitionSpecID(1, fields...)
+       }
+}
diff --git a/partitions_test.go b/partitions_test.go
index f980aab1..8a04aed2 100644
--- a/partitions_test.go
+++ b/partitions_test.go
@@ -37,7 +37,7 @@ func TestPartitionSpec(t *testing.T) {
 
        assert.Zero(t, spec1.ID())
        assert.Equal(t, 1, spec1.NumFields())
-       assert.Equal(t, idField1, spec1.Field(0))
+       assert.True(t, idField1.Equals(spec1.Field(0)))
        assert.NotEqual(t, idField1, spec1)
        assert.False(t, spec1.IsUnpartitioned())
        assert.True(t, spec1.CompatibleWith(&spec1))
@@ -53,7 +53,7 @@ func TestPartitionSpec(t *testing.T) {
 
        assert.False(t, spec1.Equals(spec2))
        assert.True(t, spec1.CompatibleWith(&spec2))
-       assert.Equal(t, []iceberg.PartitionField{idField1}, 
spec1.FieldsBySourceID(3))
+       assert.True(t, idField1.Equals(spec1.FieldsBySourceID(3)[0]))
        assert.Empty(t, spec1.FieldsBySourceID(1925))
 
        spec3 := iceberg.NewPartitionSpec(idField1, idField2)
diff --git a/table/partitioned_fanout_writer.go 
b/table/partitioned_fanout_writer.go
index a4b41d3c..d0c366af 100644
--- a/table/partitioned_fanout_writer.go
+++ b/table/partitioned_fanout_writer.go
@@ -47,6 +47,7 @@ type partitionedFanoutWriter struct {
 type partitionInfo struct {
        rows            []int64
        partitionValues map[int]any
+       partitionRec    partitionRecord // The actual partition values for 
generating the path
 }
 
 // NewPartitionedFanoutWriter creates a new PartitionedFanoutWriter with the 
specified
@@ -117,12 +118,12 @@ func (p *partitionedFanoutWriter) fanout(ctx 
context.Context, inputRecordsCh <-c
                        }
                        defer record.Release()
 
-                       partitionMap, err := p.getPartitionMap(record)
+                       partitions, err := p.getPartitions(record)
                        if err != nil {
                                return err
                        }
 
-                       for partition, val := range partitionMap {
+                       for _, val := range partitions {
                                select {
                                case <-ctx.Done():
                                        return context.Cause(ctx)
@@ -134,7 +135,8 @@ func (p *partitionedFanoutWriter) fanout(ctx 
context.Context, inputRecordsCh <-c
                                        return err
                                }
 
-                               rollingDataWriter, err := 
p.writers.getOrCreateRollingDataWriter(ctx, partition, val.partitionValues, 
dataFilesChannel)
+                               partitionPath := 
p.partitionPath(val.partitionRec)
+                               rollingDataWriter, err := 
p.writers.getOrCreateRollingDataWriter(ctx, partitionPath, val.partitionValues, 
dataFilesChannel)
                                if err != nil {
                                        return err
                                }
@@ -174,8 +176,8 @@ func (p *partitionedFanoutWriter) 
yieldDataFiles(fanoutWorkers *errgroup.Group,
        }
 }
 
-func (p *partitionedFanoutWriter) getPartitionMap(record arrow.RecordBatch) 
(map[string]partitionInfo, error) {
-       partitionMap := make(map[string]partitionInfo)
+func (p *partitionedFanoutWriter) getPartitions(record arrow.RecordBatch) 
([]*partitionInfo, error) {
+       partitionMap := newPartitionMapNode()
        partitionFields := p.partitionSpec.PartitionType(p.schema).FieldList
        partitionRec := make(partitionRecord, len(partitionFields))
 
@@ -197,7 +199,6 @@ func (p *partitionedFanoutWriter) getPartitionMap(record 
arrow.RecordBatch) (map
        }
 
        for row := range record.NumRows() {
-               partitionValues := make(map[int]any)
                for i := range partitionFields {
                        col := partitionColumns[i]
                        if !col.IsNull(int(row)) {
@@ -210,22 +211,102 @@ func (p *partitionedFanoutWriter) getPartitionMap(record 
arrow.RecordBatch) (map
                                transformedLiteral := 
sourceField.Transform.Apply(iceberg.Optional[iceberg.Literal]{Valid: true, Val: 
val})
                                if transformedLiteral.Valid {
                                        partitionRec[i] = 
transformedLiteral.Val.Any()
-                                       partitionValues[sourceField.FieldID] = 
transformedLiteral.Val.Any()
                                } else {
-                                       partitionRec[i], 
partitionValues[sourceField.FieldID] = nil, nil
+                                       partitionRec[i] = nil
                                }
                        } else {
-                               partitionRec[i], 
partitionValues[partitionFieldsInfo[i].fieldID] = nil, nil
+                               partitionRec[i] = nil
                        }
                }
-               partitionKey := p.partitionPath(partitionRec)
-               partVal := partitionMap[partitionKey]
-               partVal.rows = append(partitionMap[partitionKey].rows, row)
-               partVal.partitionValues = partitionValues
-               partitionMap[partitionKey] = partVal
+
+               // Get or create partition info for this partition key
+               partVal := partitionMap.getOrCreate(partitionRec, 
partitionFieldsInfo)
+               partVal.rows = append(partVal.rows, row)
+       }
+
+       return partitionMap.collectPartitions(), nil
+}
+
+// partitionMapNode represents a simple tree structure for storing 
partitionInfo.
+//
+// Each key is the partition value at that level of the tree, and the key 
hierarchy
+// is in the order of the partition spec.
+// The value is either a *partitionMapNode or a *partitionInfo.
+type partitionMapNode struct {
+       children  map[any]any
+       leafCount int
+}
+
+func newPartitionMapNode() *partitionMapNode {
+       return &partitionMapNode{
+               children:  make(map[any]any),
+               leafCount: 0,
+       }
+}
+
+// getOrCreate navigates the tree and returns the partitionInfo for the given 
partition key,
+// creating nodes along the way if they don't exist
+func (n *partitionMapNode) getOrCreate(partitionRec partitionRecord, fieldInfo 
[]struct {
+       sourceField *iceberg.PartitionField
+       fieldID     int
+},
+) *partitionInfo {
+       // Navigate through all but the last partition field
+       node := n
+       for _, part := range partitionRec[:len(partitionRec)-1] {
+               val, ok := node.children[part]
+               if !ok {
+                       newNode := newPartitionMapNode()
+                       node.children[part] = newNode
+                       node = newNode
+               } else {
+                       node = val.(*partitionMapNode)
+               }
+       }
+
+       // Last level stores the actual partitionInfo
+       lastKey := partitionRec[len(partitionRec)-1]
+       partVal, ok := node.children[lastKey].(*partitionInfo)
+       if ok {
+               return partVal
+       }
+
+       // First time seeing this partition - create partitionValues map
+       partitionValues := make(map[int]any, len(partitionRec))
+
+       // Copy partitionRec values so they don't get overwritten
+       partRecCopy := make(partitionRecord, len(partitionRec))
+       for i := range partitionRec {
+               partitionValues[fieldInfo[i].fieldID] = partitionRec[i]
+               partRecCopy[i] = partitionRec[i]
+       }
+
+       partVal = &partitionInfo{
+               rows:            make([]int64, 0, 128), // modest starting 
capacity
+               partitionValues: partitionValues,
+               partitionRec:    partRecCopy,
+       }
+       node.children[lastKey] = partVal
+       node.leafCount++
+
+       return partVal
+}
+
+// collectPartitions recursively collects all partitionInfo into a slice
+func (n *partitionMapNode) collectPartitions() []*partitionInfo {
+       result := make([]*partitionInfo, 0, n.leafCount)
+
+       for _, v := range n.children {
+               switch node := v.(type) {
+               case *partitionInfo:
+                       result = append(result, node)
+               case *partitionMapNode:
+                       // Recursively collect from child nodes
+                       result = append(result, node.collectPartitions()...)
+               }
        }
 
-       return partitionMap, nil
+       return result
 }
 
 type partitionBatchFn func(arrow.RecordBatch, []int64) (arrow.RecordBatch, 
error)
@@ -297,37 +378,50 @@ func getArrowValueAsIcebergLiteral(column arrow.Array, 
row int) (iceberg.Literal
        case *extensions.UUIDArray:
 
                return iceberg.NewLiteral(arr.Value(row)), nil
+
+       case *array.String:
+
+               return iceberg.NewLiteral(arr.Value(row)), nil
+       case *array.Int64:
+
+               return iceberg.NewLiteral(arr.Value(row)), nil
+       case *array.Int32:
+
+               return iceberg.NewLiteral(arr.Value(row)), nil
+       case *array.Int16:
+
+               return iceberg.NewLiteral(int32(arr.Value(row))), nil
+       case *array.Int8:
+
+               return iceberg.NewLiteral(int32(arr.Value(row))), nil
+       case *array.Uint64:
+
+               return iceberg.NewLiteral(int64(arr.Value(row))), nil
+       case *array.Uint32:
+
+               return iceberg.NewLiteral(int32(arr.Value(row))), nil
+       case *array.Uint16:
+
+               return iceberg.NewLiteral(int32(arr.Value(row))), nil
+       case *array.Uint8:
+
+               return iceberg.NewLiteral(int32(arr.Value(row))), nil
+       case *array.Float32:
+
+               return iceberg.NewLiteral(arr.Value(row)), nil
+       case *array.Float64:
+
+               return iceberg.NewLiteral(arr.Value(row)), nil
+       case *array.Boolean:
+
+               return iceberg.NewLiteral(arr.Value(row)), nil
+       case *array.Binary:
+
+               return iceberg.NewLiteral(arr.Value(row)), nil
+
        default:
                val := column.GetOneForMarshal(row)
-               switch v := val.(type) {
-               case bool:
-                       return iceberg.NewLiteral(v), nil
-               case int8:
-                       return iceberg.NewLiteral(int32(v)), nil
-               case uint8:
-                       return iceberg.NewLiteral(int32(v)), nil
-               case int16:
-                       return iceberg.NewLiteral(int32(v)), nil
-               case uint16:
-                       return iceberg.NewLiteral(int32(v)), nil
-               case int32:
-                       return iceberg.NewLiteral(v), nil
-               case uint32:
-                       return iceberg.NewLiteral(int32(v)), nil
-               case int64:
-                       return iceberg.NewLiteral(v), nil
-               case uint64:
-                       return iceberg.NewLiteral(int64(v)), nil
-               case float32:
-                       return iceberg.NewLiteral(v), nil
-               case float64:
-                       return iceberg.NewLiteral(v), nil
-               case string:
-                       return iceberg.NewLiteral(v), nil
-               case []byte:
-                       return iceberg.NewLiteral(v), nil
-               default:
-                       return nil, fmt.Errorf("unsupported value type: %T", v)
-               }
+
+               return nil, fmt.Errorf("unsupported value type: %T", val)
        }
 }
diff --git a/table/partitioned_throughput_bench_test.go 
b/table/partitioned_throughput_bench_test.go
new file mode 100644
index 00000000..76527236
--- /dev/null
+++ b/table/partitioned_throughput_bench_test.go
@@ -0,0 +1,183 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table_test
+
+import (
+       "context"
+       "path/filepath"
+       "testing"
+       "time"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/iceberg-go"
+       "github.com/apache/iceberg-go/catalog"
+       "github.com/apache/iceberg-go/table"
+)
+
+// BenchmarkPartitionedWriteThroughput benchmarks the full table.Append() path
+func BenchmarkPartitionedWriteThroughput(b *testing.B) {
+       ctx := context.Background()
+       mem := memory.NewGoAllocator()
+
+       // Define Iceberg schema matching reproducer
+       icebergSchema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "ts", Type: 
iceberg.PrimitiveTypes.TimestampTz, Required: true},
+               iceberg.NestedField{ID: 3, Name: "host", Type: 
iceberg.PrimitiveTypes.String, Required: true},
+               iceberg.NestedField{ID: 4, Name: "status_code", Type: 
iceberg.PrimitiveTypes.Int32, Required: true},
+               iceberg.NestedField{ID: 5, Name: "bytes_sent", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 6, Name: "user_agent", Type: 
iceberg.PrimitiveTypes.String, Required: true},
+       )
+
+       // Define Arrow schema (must match Iceberg schema types)
+       arrSchema := arrow.NewSchema([]arrow.Field{
+               {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+               {Name: "ts", Type: &arrow.TimestampType{Unit: 
arrow.Microsecond, TimeZone: "UTC"}, Nullable: false},
+               {Name: "host", Type: arrow.BinaryTypes.String, Nullable: false},
+               {Name: "status_code", Type: arrow.PrimitiveTypes.Int32, 
Nullable: false},
+               {Name: "bytes_sent", Type: arrow.PrimitiveTypes.Int64, 
Nullable: false},
+               {Name: "user_agent", Type: arrow.BinaryTypes.String, Nullable: 
false},
+       }, nil)
+
+       // Define partition spec (partitioned by day(ts) and host)
+       spec := iceberg.NewPartitionSpecID(1,
+               iceberg.PartitionField{SourceID: 2, FieldID: 1000, Transform: 
iceberg.DayTransform{}, Name: "ts_day"},
+               iceberg.PartitionField{SourceID: 3, FieldID: 1001, Transform: 
iceberg.IdentityTransform{}, Name: "host"},
+       )
+
+       // Helper to create a batch of records
+       createBatch := func(numRecords int) arrow.RecordBatch {
+               idB := array.NewInt64Builder(mem)
+               tsB := array.NewTimestampBuilder(mem, 
&arrow.TimestampType{Unit: arrow.Microsecond, TimeZone: "UTC"})
+               hostB := array.NewStringBuilder(mem)
+               statusB := array.NewInt32Builder(mem)
+               bytesB := array.NewInt64Builder(mem)
+               uaB := array.NewStringBuilder(mem)
+
+               defer idB.Release()
+               defer tsB.Release()
+               defer hostB.Release()
+               defer statusB.Release()
+               defer bytesB.Release()
+               defer uaB.Release()
+
+               baseTime := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)
+               hosts := []string{"example.com", "foo.test.org", "demo.net"}
+               userAgents := []string{"Mozilla/5.0", "curl/7.68.0", 
"python-requests/2.25.1"}
+
+               for i := 0; i < numRecords; i++ {
+                       idB.Append(int64(i))
+                       // Spread timestamps across multiple days to create 
multiple partitions
+                       ts := baseTime.Add(time.Duration(i%10) * 24 * time.Hour)
+                       tsB.Append(arrow.Timestamp(ts.UnixMicro()))
+                       hostB.Append(hosts[i%len(hosts)])
+                       statusB.Append(200 + int32(i%5))
+                       bytesB.Append(int64(1000 + i%5000))
+                       uaB.Append(userAgents[i%len(userAgents)])
+               }
+
+               return array.NewRecordBatch(arrSchema, []arrow.Array{
+                       idB.NewArray(),
+                       tsB.NewArray(),
+                       hostB.NewArray(),
+                       statusB.NewArray(),
+                       bytesB.NewArray(),
+                       uaB.NewArray(),
+               }, int64(numRecords))
+       }
+
+       // Run benchmarks with different batch sizes
+       benchSizes := []struct {
+               name       string
+               numRecords int
+       }{
+               {"100K_records", 100_000},
+               {"500K_records", 500_000},
+               {"2.5M_records", 2_500_000},
+       }
+
+       for _, bs := range benchSizes {
+               b.Run(bs.name, func(b *testing.B) {
+                       // Setup warehouse directory and catalog
+                       loc := filepath.ToSlash(b.TempDir())
+
+                       // Create in-memory SQL catalog
+                       cat, err := catalog.Load(ctx, "benchmark", 
iceberg.Properties{
+                               "type":        "sql",
+                               "uri":         ":memory:",
+                               "sql.dialect": "sqlite",
+                               "sql.driver":  "sqlite",
+                               "warehouse":   "file://" + loc,
+                       })
+                       if err != nil {
+                               b.Fatalf("Failed to create catalog: %v", err)
+                       }
+
+                       // Create namespace
+                       ns := table.Identifier{"benchmark"}
+                       err = cat.CreateNamespace(ctx, ns, iceberg.Properties{})
+                       if err != nil {
+                               b.Fatalf("Failed to create namespace: %v", err)
+                       }
+
+                       // Create table with partition spec
+                       tableID := table.Identifier{"benchmark", 
"partitioned_table"}
+                       tbl, err := cat.CreateTable(ctx, tableID, icebergSchema,
+                               catalog.WithPartitionSpec(&spec),
+                       )
+                       if err != nil {
+                               b.Fatalf("Failed to create table: %v", err)
+                       }
+
+                       // Pre-create batch to avoid measuring batch creation 
time
+                       testBatch := createBatch(bs.numRecords)
+                       defer testBatch.Release()
+
+                       b.ResetTimer()
+                       b.ReportAllocs()
+
+                       totalRecords := int64(0)
+                       for i := 0; i < b.N; i++ {
+                               reader, err := array.NewRecordReader(arrSchema, 
[]arrow.RecordBatch{testBatch})
+                               if err != nil {
+                                       b.Fatalf("Failed to create reader: %v", 
err)
+                               }
+
+                               newTable, err := tbl.Append(ctx, reader, 
iceberg.Properties{})
+                               if err != nil {
+                                       reader.Release()
+                                       b.Fatalf("Append error: %v", err)
+                               }
+
+                               reader.Release()
+                               tbl = newTable
+                               totalRecords += int64(bs.numRecords)
+                       }
+
+                       b.StopTimer()
+
+                       // Report throughput
+                       recordsPerOp := float64(bs.numRecords)
+                       recordsPerSec := recordsPerOp / (b.Elapsed().Seconds() 
/ float64(b.N))
+                       b.ReportMetric(recordsPerSec, "records/sec")
+                       b.ReportMetric(float64(totalRecords), "total_records")
+               })
+       }
+}
diff --git a/transforms.go b/transforms.go
index e2ce3f6b..18e799fb 100644
--- a/transforms.go
+++ b/transforms.go
@@ -127,6 +127,8 @@ func (IdentityTransform) ToHumanStr(val any) string {
        switch v := val.(type) {
        case nil:
                return "null"
+       case string:
+               return v
        case []byte:
                return base64.StdEncoding.EncodeToString(v)
        case bool:

Reply via email to