This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 07a6a6ac perf(table): optimize partitioned write throughput (#622)
07a6a6ac is described below
commit 07a6a6acee26e47e17d48c97a803e852387a3a89
Author: Alex <[email protected]>
AuthorDate: Mon Nov 17 11:42:47 2025 -0700
perf(table): optimize partitioned write throughput (#622)
# Partitioned Write Optimizations
## Summary
This PR delivers significant performance improvements to the partitioned
write throughput in the Iceberg table writer. Through a series of
iterative optimizations, we achieved substantial gains in write
performance, reduced memory allocations, and improved overall
efficiency.
## Performance Results
Note: the following results are using the latest commit on `main` of
arrow-go
(https://github.com/apache/arrow-go/commit/3160eef9c227d94db67bfaf5225a2d6c1f48bc76)
The following benchmarks were conducted on Apple M3 Max (darwin/arm64)
for the new `BenchmarkPartitionedWriteThroughput` test with 2.5M rows
per write operation:
### Incremental Improvements
| Change | Time/op | Δ Time | records/sec | Δ records/sec | allocs/op |
Δ allocs/op |
|--------|---------|---------|-------------|---------------|-----------|-------------|
| Base | 2.35 s | - | 1,065,115 | - | 60,076,290 | - |
| Change 1 | 1.49 s | -36.5% | 1,677,802 | +57.5% | 35,076,376 | -41.6%
|
| Change 2 | 1.21 s | -18.7% | 2,064,629 | +23.1% | 25,076,562 | -28.5%
|
| Change 3 | 1.16 s | -4.5% | 2,161,545 | +4.7% | 22,576,588 | -10.0% |
| Change 4 | 1.07 s | -7.4% | 2,334,700 | +8.0% | 20,076,480 | -11.1% |
| Change 5 | 654.1 ms | -38.9% | 3,821,892 | +63.7% | 12,577,119 |
-37.4% |
### Overall Improvement (Base → Change 5)
**2.5M Records per write:**
| Metric | Base | Change 5 | Improvement |
|--------|------|----------|-------------|
| **Time/op** | 2.35 s | 654.1 ms | **-72.1%** ⚡ |
| **records/sec** | 1,065,115 | 3,821,892 | **+258.9%** 🚀 |
| **allocs/op** | 60,076,290 | 12,577,119 | **-79.1%** 💪 |
**100K Records per write:**
| Metric | Base | Change 5 | Improvement |
|--------|------|----------|-------------|
| **Time/op** | 221.3 ms | 137.7 ms | **-37.8%** |
| **records/sec** | 451,853 | 726,085 | **+60.7%** |
| **allocs/op** | 2,472,792 | 573,012 | **-76.8%** |
## Impact
These optimizations significantly improve the performance of partitioned
writes in Iceberg tables, making the library more efficient for
high-throughput data ingestion scenarios. The improvements scale well
with larger workloads, as demonstrated by the dramatic gains in the 2.5M
record benchmark results.
## Testing
All optimizations were validated through comprehensive benchmarking on
darwin/arm64 (Apple M3 Max). The improvements are consistent across
different record volumes (100K, 500K, and 2.5M records).
---
go.mod | 2 +-
go.sum | 4 +-
partitions.go | 78 ++++++++--
partitions_bench_test.go | 222 +++++++++++++++++++++++++++++
partitions_test.go | 4 +-
table/partitioned_fanout_writer.go | 184 ++++++++++++++++++------
table/partitioned_throughput_bench_test.go | 183 ++++++++++++++++++++++++
transforms.go | 2 +
8 files changed, 620 insertions(+), 59 deletions(-)
diff --git a/go.mod b/go.mod
index c30b4a3e..5fb67087 100644
--- a/go.mod
+++ b/go.mod
@@ -23,7 +23,7 @@ require (
cloud.google.com/go/storage v1.57.2
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.13.1
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.3
- github.com/apache/arrow-go/v18 v18.4.2-0.20251022172928-a489ef0147d6
+ github.com/apache/arrow-go/v18 v18.4.2-0.20251108194323-3160eef9c227
github.com/aws/aws-sdk-go-v2 v1.39.6
github.com/aws/aws-sdk-go-v2/config v1.31.20
github.com/aws/aws-sdk-go-v2/credentials v1.18.24
diff --git a/go.sum b/go.sum
index 552f2f07..992a6cf3 100644
--- a/go.sum
+++ b/go.sum
@@ -95,8 +95,8 @@ github.com/andybalholm/brotli v1.2.0
h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwTo
github.com/andybalholm/brotli v1.2.0/go.mod
h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY=
github.com/antlr4-go/antlr/v4 v4.13.1
h1:SqQKkuVZ+zWkMMNkjy5FZe5mr5WURWnlpmOuzYWrPrQ=
github.com/antlr4-go/antlr/v4 v4.13.1/go.mod
h1:GKmUxMtwp6ZgGwZSva4eWPC5mS6vUAmOABFgjdkM7Nw=
-github.com/apache/arrow-go/v18 v18.4.2-0.20251022172928-a489ef0147d6
h1:kqqktu1fXE+DUjlXntoYe1wATKo/4CoMQ805dDX4Zpo=
-github.com/apache/arrow-go/v18 v18.4.2-0.20251022172928-a489ef0147d6/go.mod
h1:YfS5yWtFXamxhBvVOKgjrrLQ7YV86vn2RCCgrKdgfbU=
+github.com/apache/arrow-go/v18 v18.4.2-0.20251108194323-3160eef9c227
h1:hiRpYiLRp3c69BsnjgE2dAGdx6sQyfw6Btvto1Z+t1s=
+github.com/apache/arrow-go/v18 v18.4.2-0.20251108194323-3160eef9c227/go.mod
h1:YfS5yWtFXamxhBvVOKgjrrLQ7YV86vn2RCCgrKdgfbU=
github.com/apache/thrift v0.22.0
h1:r7mTJdj51TMDe6RtcmNdQxgn9XcyfGDOzegMDRg47uc=
github.com/apache/thrift v0.22.0/go.mod
h1:1e7J/O1Ae6ZQMTYdy9xa3w9k+XHWPfRvdPyJeynQ+/g=
github.com/apparentlymart/go-textseg/v15 v15.0.0
h1:uYvfpb3DyLSCGWnctWKGj857c6ew1u1fNQOlOtuGxQY=
diff --git a/partitions.go b/partitions.go
index 598adf1e..c3f4d2e3 100644
--- a/partitions.go
+++ b/partitions.go
@@ -23,7 +23,6 @@ import (
"fmt"
"iter"
"net/url"
- "path"
"slices"
"strings"
)
@@ -50,6 +49,26 @@ type PartitionField struct {
Name string `json:"name"`
// Transform is the transform used to produce the partition value
Transform Transform `json:"transform"`
+
+ // escapedName is a cached URL-escaped version of Name for performance
+ // This is populated during initialization and not serialized
+ escapedName string
+}
+
+// EscapedName returns the URL-escaped version of the partition field name.
+func (p *PartitionField) EscapedName() string {
+ if p.escapedName == "" {
+ p.escapedName = url.QueryEscape(p.Name)
+ }
+
+ return p.escapedName
+}
+
+func (p PartitionField) Equals(other PartitionField) bool {
+ return p.SourceID == other.SourceID &&
+ p.FieldID == other.FieldID &&
+ p.Name == other.Name &&
+ p.Transform.Equals(other.Transform)
}
func (p *PartitionField) String() string {
@@ -85,6 +104,11 @@ type PartitionSpec struct {
// this is populated by initialize after creation
sourceIdToFields map[int][]PartitionField
+
+ // partitionTypeCache caches the result of PartitionType for a given
schema ID
+ // Key is schema ID, value is the cached StructType
+ // This avoids rebuilding the partition type on every row during writes
+ partitionTypeCache map[int]*StructType
}
type PartitionOption func(*PartitionSpec) error
@@ -344,8 +368,10 @@ func (ps *PartitionSpec) UnmarshalJSON(b []byte) error {
func (ps *PartitionSpec) initialize() {
ps.sourceIdToFields = make(map[int][]PartitionField)
- for _, f := range ps.fields {
- ps.sourceIdToFields[f.SourceID] =
append(ps.sourceIdToFields[f.SourceID], f)
+ ps.partitionTypeCache = make(map[int]*StructType)
+
+ for i := range ps.fields {
+ ps.sourceIdToFields[ps.fields[i].SourceID] =
append(ps.sourceIdToFields[ps.fields[i].SourceID], ps.fields[i])
}
}
@@ -419,6 +445,18 @@ func (ps *PartitionSpec) LastAssignedFieldID() int {
// and only parittion spec that uses a required source column will never be
// null, but it doesn't seem worth tracking this case.
func (ps *PartitionSpec) PartitionType(schema *Schema) *StructType {
+ // Initialize cache if needed (for PartitionSpecs created without
NewPartitionSpecOpts)
+ if ps.partitionTypeCache == nil {
+ ps.partitionTypeCache = make(map[int]*StructType)
+ }
+
+ // Check cache first using schema ID as key
+ schemaID := schema.ID
+ if cached, ok := ps.partitionTypeCache[schemaID]; ok {
+ return cached
+ }
+
+ // Build partition type if not cached
nestedFields := []NestedField{}
for _, field := range ps.fields {
sourceType, ok := schema.FindTypeByID(field.SourceID)
@@ -434,7 +472,10 @@ func (ps *PartitionSpec) PartitionType(schema *Schema)
*StructType {
})
}
- return &StructType{FieldList: nestedFields}
+ result := &StructType{FieldList: nestedFields}
+ ps.partitionTypeCache[schemaID] = result
+
+ return result
}
// PartitionToPath produces a proper partition path from the data and schema by
@@ -447,15 +488,34 @@ func (ps *PartitionSpec) PartitionType(schema *Schema)
*StructType {
func (ps *PartitionSpec) PartitionToPath(data structLike, sc *Schema) string {
partType := ps.PartitionType(sc)
- segments := make([]string, 0, len(partType.FieldList))
+ if len(partType.FieldList) == 0 {
+ return ""
+ }
+
+ // Use strings.Builder for efficient string concatenation
+ // Estimate capacity: escaped_name + "=" + escaped_value + "/" per field
+ var sb strings.Builder
+ estimatedSize := 0
for i := range partType.Fields() {
- valueStr := ps.fields[i].Transform.ToHumanStr(data.Get(i))
+ estimatedSize += len(ps.fields[i].EscapedName()) + 20 // name +
"=" + avg value + "/"
+ }
+ sb.Grow(estimatedSize)
+
+ for i := range partType.Fields() {
+ if i > 0 {
+ sb.WriteByte('/')
+ }
+
+ // Use pre-escaped field name (now guaranteed to be initialized)
+ sb.WriteString(ps.fields[i].EscapedName())
+ sb.WriteByte('=')
- segments = append(segments, fmt.Sprintf("%s=%s",
- url.QueryEscape(ps.fields[i].Name),
url.QueryEscape(valueStr)))
+ // Only escape the value (which changes per row)
+ valueStr := ps.fields[i].Transform.ToHumanStr(data.Get(i))
+ sb.WriteString(url.QueryEscape(valueStr))
}
- return path.Join(segments...)
+ return sb.String()
}
// GeneratePartitionFieldName returns default partition field name based on
field transform type
diff --git a/partitions_bench_test.go b/partitions_bench_test.go
new file mode 100644
index 00000000..e170b981
--- /dev/null
+++ b/partitions_bench_test.go
@@ -0,0 +1,222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package iceberg_test
+
+import (
+ "testing"
+
+ "github.com/apache/iceberg-go"
+)
+
+// BenchmarkPartitionToPath benchmarks the optimized PartitionToPath function
+// which uses cached URL-escaped field names and strings.Builder for efficient
+// string concatenation.
+func BenchmarkPartitionToPath(b *testing.B) {
+ schema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "str", Type:
iceberg.PrimitiveTypes.String},
+ iceberg.NestedField{ID: 2, Name: "other_str", Type:
iceberg.PrimitiveTypes.String},
+ iceberg.NestedField{ID: 3, Name: "int", Type:
iceberg.PrimitiveTypes.Int32, Required: true},
+ iceberg.NestedField{ID: 4, Name: "date", Type:
iceberg.PrimitiveTypes.Date, Required: true},
+ iceberg.NestedField{ID: 5, Name: "ts", Type:
iceberg.PrimitiveTypes.Timestamp, Required: true},
+ )
+
+ // Create a partition spec with fields that need URL escaping
+ spec := iceberg.NewPartitionSpecID(1,
+ iceberg.PartitionField{
+ SourceID: 1, FieldID: 1000,
+ Transform: iceberg.TruncateTransform{Width: 19}, Name:
"my#str%bucket",
+ },
+ iceberg.PartitionField{
+ SourceID: 2, FieldID: 1001,
+ Transform: iceberg.IdentityTransform{}, Name: "other
str+bucket",
+ },
+ iceberg.PartitionField{
+ SourceID: 3, FieldID: 1002,
+ Transform: iceberg.BucketTransform{NumBuckets: 25},
Name: "my!int:bucket",
+ },
+ iceberg.PartitionField{
+ SourceID: 4, FieldID: 1003,
+ Transform: iceberg.DayTransform{}, Name: "date/day",
+ },
+ iceberg.PartitionField{
+ SourceID: 5, FieldID: 1004,
+ Transform: iceberg.HourTransform{}, Name: "ts/hour",
+ },
+ )
+
+ record := partitionRecord{"my+str", "( )", int32(10), int32(18993),
int64(1672531200000000)}
+
+ b.ResetTimer()
+ b.ReportAllocs()
+ for i := 0; i < b.N; i++ {
+ _ = spec.PartitionToPath(record, schema)
+ }
+}
+
+// BenchmarkPartitionToPathManyFields benchmarks PartitionToPath with many
partition fields
+// to verify performance scales well with the number of fields.
+func BenchmarkPartitionToPathManyFields(b *testing.B) {
+ // Create a schema with many fields
+ fields := make([]iceberg.NestedField, 0, 20)
+ partitionFields := make([]iceberg.PartitionField, 0, 20)
+ recordValues := make([]any, 0, 20)
+
+ for i := 1; i <= 20; i++ {
+ fields = append(fields, iceberg.NestedField{
+ ID: i, Name: "field_" + string(rune('a'+i-1)),
+ Type: iceberg.PrimitiveTypes.String, Required: false,
+ })
+ partitionFields = append(partitionFields,
iceberg.PartitionField{
+ SourceID: i, FieldID: 1000 + i,
+ Transform: iceberg.IdentityTransform{}, Name:
"part_field_" + string(rune('a'+i-1)),
+ })
+ recordValues = append(recordValues,
"value_"+string(rune('a'+i-1)))
+ }
+
+ schema := iceberg.NewSchema(0, fields...)
+ spec := iceberg.NewPartitionSpecID(1, partitionFields...)
+ record := partitionRecord(recordValues)
+
+ b.ResetTimer()
+ b.ReportAllocs()
+ for i := 0; i < b.N; i++ {
+ _ = spec.PartitionToPath(record, schema)
+ }
+}
+
+// BenchmarkPartitionType benchmarks the PartitionType function with caching.
+// The first call should be slower (builds the type), subsequent calls should
+// be faster (uses cache).
+func BenchmarkPartitionType(b *testing.B) {
+ schema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "str", Type:
iceberg.PrimitiveTypes.String},
+ iceberg.NestedField{ID: 2, Name: "int", Type:
iceberg.PrimitiveTypes.Int32, Required: true},
+ iceberg.NestedField{ID: 3, Name: "bool", Type:
iceberg.PrimitiveTypes.Bool, Required: false},
+ )
+
+ spec := iceberg.NewPartitionSpecID(1,
+ iceberg.PartitionField{
+ SourceID: 1, FieldID: 1000,
+ Transform: iceberg.TruncateTransform{Width: 19}, Name:
"str_truncate",
+ },
+ iceberg.PartitionField{
+ SourceID: 2, FieldID: 1001,
+ Transform: iceberg.BucketTransform{NumBuckets: 25},
Name: "int_bucket",
+ },
+ iceberg.PartitionField{
+ SourceID: 3, FieldID: 1002,
+ Transform: iceberg.IdentityTransform{}, Name:
"bool_identity",
+ },
+ )
+
+ b.ResetTimer()
+ b.ReportAllocs()
+ for i := 0; i < b.N; i++ {
+ _ = spec.PartitionType(schema)
+ }
+}
+
+// BenchmarkPartitionTypeMultipleSchemas benchmarks PartitionType with multiple
+// different schemas to verify caching works correctly per schema ID.
+func BenchmarkPartitionTypeMultipleSchemas(b *testing.B) {
+ schemas := make([]*iceberg.Schema, 10)
+ for i := range schemas {
+ schemas[i] = iceberg.NewSchema(i,
+ iceberg.NestedField{ID: 1, Name: "str", Type:
iceberg.PrimitiveTypes.String},
+ iceberg.NestedField{ID: 2, Name: "int", Type:
iceberg.PrimitiveTypes.Int32, Required: true},
+ )
+ }
+
+ spec := iceberg.NewPartitionSpecID(1,
+ iceberg.PartitionField{
+ SourceID: 1, FieldID: 1000,
+ Transform: iceberg.IdentityTransform{}, Name:
"str_identity",
+ },
+ iceberg.PartitionField{
+ SourceID: 2, FieldID: 1001,
+ Transform: iceberg.BucketTransform{NumBuckets: 10},
Name: "int_bucket",
+ },
+ )
+
+ b.ResetTimer()
+ b.ReportAllocs()
+ for i := 0; i < b.N; i++ {
+ schema := schemas[i%len(schemas)]
+ _ = spec.PartitionType(schema)
+ }
+}
+
+// BenchmarkPartitionToPathRepeated benchmarks repeated calls to
PartitionToPath
+// with the same spec to verify that cached escaped names provide performance
benefits.
+func BenchmarkPartitionToPathRepeated(b *testing.B) {
+ schema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "str", Type:
iceberg.PrimitiveTypes.String},
+ iceberg.NestedField{ID: 2, Name: "other_str", Type:
iceberg.PrimitiveTypes.String},
+ iceberg.NestedField{ID: 3, Name: "int", Type:
iceberg.PrimitiveTypes.Int32, Required: true},
+ )
+
+ // Use field names that require URL escaping to maximize the benefit of
caching
+ spec := iceberg.NewPartitionSpecID(1,
+ iceberg.PartitionField{
+ SourceID: 1, FieldID: 1000,
+ Transform: iceberg.TruncateTransform{Width: 19}, Name:
"my#str%bucket",
+ },
+ iceberg.PartitionField{
+ SourceID: 2, FieldID: 1001,
+ Transform: iceberg.IdentityTransform{}, Name: "other
str+bucket",
+ },
+ iceberg.PartitionField{
+ SourceID: 3, FieldID: 1002,
+ Transform: iceberg.BucketTransform{NumBuckets: 25},
Name: "my!int:bucket",
+ },
+ )
+
+ // Create multiple records with different values
+ records := []partitionRecord{
+ {"my+str", "( )", int32(10)},
+ {"another/value", "test data", int32(20)},
+ {"third&record", "more data", int32(30)},
+ {"fourth=record", "even more", int32(40)},
+ {"fifth?record", "last one", int32(50)},
+ }
+
+ b.ResetTimer()
+ b.ReportAllocs()
+ for i := 0; i < b.N; i++ {
+ record := records[i%len(records)]
+ _ = spec.PartitionToPath(record, schema)
+ }
+}
+
+// BenchmarkPartitionSpecInitialize benchmarks the initialization of a
partition spec
+// which now pre-computes URL-escaped field names.
+func BenchmarkPartitionSpecInitialize(b *testing.B) {
+ fields := make([]iceberg.PartitionField, 10)
+ for i := range fields {
+ fields[i] = iceberg.PartitionField{
+ SourceID: i + 1, FieldID: 1000 + i,
+ Transform: iceberg.IdentityTransform{}, Name:
"field#with%special&chars=" + string(rune('a'+i)),
+ }
+ }
+
+ b.ResetTimer()
+ b.ReportAllocs()
+ for i := 0; i < b.N; i++ {
+ _ = iceberg.NewPartitionSpecID(1, fields...)
+ }
+}
diff --git a/partitions_test.go b/partitions_test.go
index f980aab1..8a04aed2 100644
--- a/partitions_test.go
+++ b/partitions_test.go
@@ -37,7 +37,7 @@ func TestPartitionSpec(t *testing.T) {
assert.Zero(t, spec1.ID())
assert.Equal(t, 1, spec1.NumFields())
- assert.Equal(t, idField1, spec1.Field(0))
+ assert.True(t, idField1.Equals(spec1.Field(0)))
assert.NotEqual(t, idField1, spec1)
assert.False(t, spec1.IsUnpartitioned())
assert.True(t, spec1.CompatibleWith(&spec1))
@@ -53,7 +53,7 @@ func TestPartitionSpec(t *testing.T) {
assert.False(t, spec1.Equals(spec2))
assert.True(t, spec1.CompatibleWith(&spec2))
- assert.Equal(t, []iceberg.PartitionField{idField1},
spec1.FieldsBySourceID(3))
+ assert.True(t, idField1.Equals(spec1.FieldsBySourceID(3)[0]))
assert.Empty(t, spec1.FieldsBySourceID(1925))
spec3 := iceberg.NewPartitionSpec(idField1, idField2)
diff --git a/table/partitioned_fanout_writer.go
b/table/partitioned_fanout_writer.go
index a4b41d3c..d0c366af 100644
--- a/table/partitioned_fanout_writer.go
+++ b/table/partitioned_fanout_writer.go
@@ -47,6 +47,7 @@ type partitionedFanoutWriter struct {
type partitionInfo struct {
rows []int64
partitionValues map[int]any
+ partitionRec partitionRecord // The actual partition values for
generating the path
}
// NewPartitionedFanoutWriter creates a new PartitionedFanoutWriter with the
specified
@@ -117,12 +118,12 @@ func (p *partitionedFanoutWriter) fanout(ctx
context.Context, inputRecordsCh <-c
}
defer record.Release()
- partitionMap, err := p.getPartitionMap(record)
+ partitions, err := p.getPartitions(record)
if err != nil {
return err
}
- for partition, val := range partitionMap {
+ for _, val := range partitions {
select {
case <-ctx.Done():
return context.Cause(ctx)
@@ -134,7 +135,8 @@ func (p *partitionedFanoutWriter) fanout(ctx
context.Context, inputRecordsCh <-c
return err
}
- rollingDataWriter, err :=
p.writers.getOrCreateRollingDataWriter(ctx, partition, val.partitionValues,
dataFilesChannel)
+ partitionPath :=
p.partitionPath(val.partitionRec)
+ rollingDataWriter, err :=
p.writers.getOrCreateRollingDataWriter(ctx, partitionPath, val.partitionValues,
dataFilesChannel)
if err != nil {
return err
}
@@ -174,8 +176,8 @@ func (p *partitionedFanoutWriter)
yieldDataFiles(fanoutWorkers *errgroup.Group,
}
}
-func (p *partitionedFanoutWriter) getPartitionMap(record arrow.RecordBatch)
(map[string]partitionInfo, error) {
- partitionMap := make(map[string]partitionInfo)
+func (p *partitionedFanoutWriter) getPartitions(record arrow.RecordBatch)
([]*partitionInfo, error) {
+ partitionMap := newPartitionMapNode()
partitionFields := p.partitionSpec.PartitionType(p.schema).FieldList
partitionRec := make(partitionRecord, len(partitionFields))
@@ -197,7 +199,6 @@ func (p *partitionedFanoutWriter) getPartitionMap(record
arrow.RecordBatch) (map
}
for row := range record.NumRows() {
- partitionValues := make(map[int]any)
for i := range partitionFields {
col := partitionColumns[i]
if !col.IsNull(int(row)) {
@@ -210,22 +211,102 @@ func (p *partitionedFanoutWriter) getPartitionMap(record
arrow.RecordBatch) (map
transformedLiteral :=
sourceField.Transform.Apply(iceberg.Optional[iceberg.Literal]{Valid: true, Val:
val})
if transformedLiteral.Valid {
partitionRec[i] =
transformedLiteral.Val.Any()
- partitionValues[sourceField.FieldID] =
transformedLiteral.Val.Any()
} else {
- partitionRec[i],
partitionValues[sourceField.FieldID] = nil, nil
+ partitionRec[i] = nil
}
} else {
- partitionRec[i],
partitionValues[partitionFieldsInfo[i].fieldID] = nil, nil
+ partitionRec[i] = nil
}
}
- partitionKey := p.partitionPath(partitionRec)
- partVal := partitionMap[partitionKey]
- partVal.rows = append(partitionMap[partitionKey].rows, row)
- partVal.partitionValues = partitionValues
- partitionMap[partitionKey] = partVal
+
+ // Get or create partition info for this partition key
+ partVal := partitionMap.getOrCreate(partitionRec,
partitionFieldsInfo)
+ partVal.rows = append(partVal.rows, row)
+ }
+
+ return partitionMap.collectPartitions(), nil
+}
+
+// partitionMapNode represents a simple tree structure for storing
partitionInfo.
+//
+// Each key is the partition value at that level of the tree, and the key
hierarchy
+// is in the order of the partition spec.
+// The value is either a *partitionMapNode or a *partitionInfo.
+type partitionMapNode struct {
+ children map[any]any
+ leafCount int
+}
+
+func newPartitionMapNode() *partitionMapNode {
+ return &partitionMapNode{
+ children: make(map[any]any),
+ leafCount: 0,
+ }
+}
+
+// getOrCreate navigates the tree and returns the partitionInfo for the given
partition key,
+// creating nodes along the way if they don't exist
+func (n *partitionMapNode) getOrCreate(partitionRec partitionRecord, fieldInfo
[]struct {
+ sourceField *iceberg.PartitionField
+ fieldID int
+},
+) *partitionInfo {
+ // Navigate through all but the last partition field
+ node := n
+ for _, part := range partitionRec[:len(partitionRec)-1] {
+ val, ok := node.children[part]
+ if !ok {
+ newNode := newPartitionMapNode()
+ node.children[part] = newNode
+ node = newNode
+ } else {
+ node = val.(*partitionMapNode)
+ }
+ }
+
+ // Last level stores the actual partitionInfo
+ lastKey := partitionRec[len(partitionRec)-1]
+ partVal, ok := node.children[lastKey].(*partitionInfo)
+ if ok {
+ return partVal
+ }
+
+ // First time seeing this partition - create partitionValues map
+ partitionValues := make(map[int]any, len(partitionRec))
+
+ // Copy partitionRec values so they don't get overwritten
+ partRecCopy := make(partitionRecord, len(partitionRec))
+ for i := range partitionRec {
+ partitionValues[fieldInfo[i].fieldID] = partitionRec[i]
+ partRecCopy[i] = partitionRec[i]
+ }
+
+ partVal = &partitionInfo{
+ rows: make([]int64, 0, 128), // modest starting
capacity
+ partitionValues: partitionValues,
+ partitionRec: partRecCopy,
+ }
+ node.children[lastKey] = partVal
+ node.leafCount++
+
+ return partVal
+}
+
+// collectPartitions recursively collects all partitionInfo into a slice
+func (n *partitionMapNode) collectPartitions() []*partitionInfo {
+ result := make([]*partitionInfo, 0, n.leafCount)
+
+ for _, v := range n.children {
+ switch node := v.(type) {
+ case *partitionInfo:
+ result = append(result, node)
+ case *partitionMapNode:
+ // Recursively collect from child nodes
+ result = append(result, node.collectPartitions()...)
+ }
}
- return partitionMap, nil
+ return result
}
type partitionBatchFn func(arrow.RecordBatch, []int64) (arrow.RecordBatch,
error)
@@ -297,37 +378,50 @@ func getArrowValueAsIcebergLiteral(column arrow.Array,
row int) (iceberg.Literal
case *extensions.UUIDArray:
return iceberg.NewLiteral(arr.Value(row)), nil
+
+ case *array.String:
+
+ return iceberg.NewLiteral(arr.Value(row)), nil
+ case *array.Int64:
+
+ return iceberg.NewLiteral(arr.Value(row)), nil
+ case *array.Int32:
+
+ return iceberg.NewLiteral(arr.Value(row)), nil
+ case *array.Int16:
+
+ return iceberg.NewLiteral(int32(arr.Value(row))), nil
+ case *array.Int8:
+
+ return iceberg.NewLiteral(int32(arr.Value(row))), nil
+ case *array.Uint64:
+
+ return iceberg.NewLiteral(int64(arr.Value(row))), nil
+ case *array.Uint32:
+
+ return iceberg.NewLiteral(int32(arr.Value(row))), nil
+ case *array.Uint16:
+
+ return iceberg.NewLiteral(int32(arr.Value(row))), nil
+ case *array.Uint8:
+
+ return iceberg.NewLiteral(int32(arr.Value(row))), nil
+ case *array.Float32:
+
+ return iceberg.NewLiteral(arr.Value(row)), nil
+ case *array.Float64:
+
+ return iceberg.NewLiteral(arr.Value(row)), nil
+ case *array.Boolean:
+
+ return iceberg.NewLiteral(arr.Value(row)), nil
+ case *array.Binary:
+
+ return iceberg.NewLiteral(arr.Value(row)), nil
+
default:
val := column.GetOneForMarshal(row)
- switch v := val.(type) {
- case bool:
- return iceberg.NewLiteral(v), nil
- case int8:
- return iceberg.NewLiteral(int32(v)), nil
- case uint8:
- return iceberg.NewLiteral(int32(v)), nil
- case int16:
- return iceberg.NewLiteral(int32(v)), nil
- case uint16:
- return iceberg.NewLiteral(int32(v)), nil
- case int32:
- return iceberg.NewLiteral(v), nil
- case uint32:
- return iceberg.NewLiteral(int32(v)), nil
- case int64:
- return iceberg.NewLiteral(v), nil
- case uint64:
- return iceberg.NewLiteral(int64(v)), nil
- case float32:
- return iceberg.NewLiteral(v), nil
- case float64:
- return iceberg.NewLiteral(v), nil
- case string:
- return iceberg.NewLiteral(v), nil
- case []byte:
- return iceberg.NewLiteral(v), nil
- default:
- return nil, fmt.Errorf("unsupported value type: %T", v)
- }
+
+ return nil, fmt.Errorf("unsupported value type: %T", val)
}
}
diff --git a/table/partitioned_throughput_bench_test.go
b/table/partitioned_throughput_bench_test.go
new file mode 100644
index 00000000..76527236
--- /dev/null
+++ b/table/partitioned_throughput_bench_test.go
@@ -0,0 +1,183 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table_test
+
+import (
+ "context"
+ "path/filepath"
+ "testing"
+ "time"
+
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/arrow-go/v18/arrow/array"
+ "github.com/apache/arrow-go/v18/arrow/memory"
+ "github.com/apache/iceberg-go"
+ "github.com/apache/iceberg-go/catalog"
+ "github.com/apache/iceberg-go/table"
+)
+
+// BenchmarkPartitionedWriteThroughput benchmarks the full table.Append() path
+func BenchmarkPartitionedWriteThroughput(b *testing.B) {
+ ctx := context.Background()
+ mem := memory.NewGoAllocator()
+
+ // Define Iceberg schema matching reproducer
+ icebergSchema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 2, Name: "ts", Type:
iceberg.PrimitiveTypes.TimestampTz, Required: true},
+ iceberg.NestedField{ID: 3, Name: "host", Type:
iceberg.PrimitiveTypes.String, Required: true},
+ iceberg.NestedField{ID: 4, Name: "status_code", Type:
iceberg.PrimitiveTypes.Int32, Required: true},
+ iceberg.NestedField{ID: 5, Name: "bytes_sent", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 6, Name: "user_agent", Type:
iceberg.PrimitiveTypes.String, Required: true},
+ )
+
+ // Define Arrow schema (must match Iceberg schema types)
+ arrSchema := arrow.NewSchema([]arrow.Field{
+ {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+ {Name: "ts", Type: &arrow.TimestampType{Unit:
arrow.Microsecond, TimeZone: "UTC"}, Nullable: false},
+ {Name: "host", Type: arrow.BinaryTypes.String, Nullable: false},
+ {Name: "status_code", Type: arrow.PrimitiveTypes.Int32,
Nullable: false},
+ {Name: "bytes_sent", Type: arrow.PrimitiveTypes.Int64,
Nullable: false},
+ {Name: "user_agent", Type: arrow.BinaryTypes.String, Nullable:
false},
+ }, nil)
+
+ // Define partition spec (partitioned by day(ts) and host)
+ spec := iceberg.NewPartitionSpecID(1,
+ iceberg.PartitionField{SourceID: 2, FieldID: 1000, Transform:
iceberg.DayTransform{}, Name: "ts_day"},
+ iceberg.PartitionField{SourceID: 3, FieldID: 1001, Transform:
iceberg.IdentityTransform{}, Name: "host"},
+ )
+
+ // Helper to create a batch of records
+ createBatch := func(numRecords int) arrow.RecordBatch {
+ idB := array.NewInt64Builder(mem)
+ tsB := array.NewTimestampBuilder(mem,
&arrow.TimestampType{Unit: arrow.Microsecond, TimeZone: "UTC"})
+ hostB := array.NewStringBuilder(mem)
+ statusB := array.NewInt32Builder(mem)
+ bytesB := array.NewInt64Builder(mem)
+ uaB := array.NewStringBuilder(mem)
+
+ defer idB.Release()
+ defer tsB.Release()
+ defer hostB.Release()
+ defer statusB.Release()
+ defer bytesB.Release()
+ defer uaB.Release()
+
+ baseTime := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)
+ hosts := []string{"example.com", "foo.test.org", "demo.net"}
+ userAgents := []string{"Mozilla/5.0", "curl/7.68.0",
"python-requests/2.25.1"}
+
+ for i := 0; i < numRecords; i++ {
+ idB.Append(int64(i))
+ // Spread timestamps across multiple days to create
multiple partitions
+ ts := baseTime.Add(time.Duration(i%10) * 24 * time.Hour)
+ tsB.Append(arrow.Timestamp(ts.UnixMicro()))
+ hostB.Append(hosts[i%len(hosts)])
+ statusB.Append(200 + int32(i%5))
+ bytesB.Append(int64(1000 + i%5000))
+ uaB.Append(userAgents[i%len(userAgents)])
+ }
+
+ return array.NewRecordBatch(arrSchema, []arrow.Array{
+ idB.NewArray(),
+ tsB.NewArray(),
+ hostB.NewArray(),
+ statusB.NewArray(),
+ bytesB.NewArray(),
+ uaB.NewArray(),
+ }, int64(numRecords))
+ }
+
+ // Run benchmarks with different batch sizes
+ benchSizes := []struct {
+ name string
+ numRecords int
+ }{
+ {"100K_records", 100_000},
+ {"500K_records", 500_000},
+ {"2.5M_records", 2_500_000},
+ }
+
+ for _, bs := range benchSizes {
+ b.Run(bs.name, func(b *testing.B) {
+ // Setup warehouse directory and catalog
+ loc := filepath.ToSlash(b.TempDir())
+
+ // Create in-memory SQL catalog
+ cat, err := catalog.Load(ctx, "benchmark",
iceberg.Properties{
+ "type": "sql",
+ "uri": ":memory:",
+ "sql.dialect": "sqlite",
+ "sql.driver": "sqlite",
+ "warehouse": "file://" + loc,
+ })
+ if err != nil {
+ b.Fatalf("Failed to create catalog: %v", err)
+ }
+
+ // Create namespace
+ ns := table.Identifier{"benchmark"}
+ err = cat.CreateNamespace(ctx, ns, iceberg.Properties{})
+ if err != nil {
+ b.Fatalf("Failed to create namespace: %v", err)
+ }
+
+ // Create table with partition spec
+ tableID := table.Identifier{"benchmark",
"partitioned_table"}
+ tbl, err := cat.CreateTable(ctx, tableID, icebergSchema,
+ catalog.WithPartitionSpec(&spec),
+ )
+ if err != nil {
+ b.Fatalf("Failed to create table: %v", err)
+ }
+
+ // Pre-create batch to avoid measuring batch creation
time
+ testBatch := createBatch(bs.numRecords)
+ defer testBatch.Release()
+
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ totalRecords := int64(0)
+ for i := 0; i < b.N; i++ {
+ reader, err := array.NewRecordReader(arrSchema,
[]arrow.RecordBatch{testBatch})
+ if err != nil {
+ b.Fatalf("Failed to create reader: %v",
err)
+ }
+
+ newTable, err := tbl.Append(ctx, reader,
iceberg.Properties{})
+ if err != nil {
+ reader.Release()
+ b.Fatalf("Append error: %v", err)
+ }
+
+ reader.Release()
+ tbl = newTable
+ totalRecords += int64(bs.numRecords)
+ }
+
+ b.StopTimer()
+
+ // Report throughput
+ recordsPerOp := float64(bs.numRecords)
+ recordsPerSec := recordsPerOp / (b.Elapsed().Seconds()
/ float64(b.N))
+ b.ReportMetric(recordsPerSec, "records/sec")
+ b.ReportMetric(float64(totalRecords), "total_records")
+ })
+ }
+}
diff --git a/transforms.go b/transforms.go
index e2ce3f6b..18e799fb 100644
--- a/transforms.go
+++ b/transforms.go
@@ -127,6 +127,8 @@ func (IdentityTransform) ToHumanStr(val any) string {
switch v := val.(type) {
case nil:
return "null"
+ case string:
+ return v
case []byte:
return base64.StdEncoding.EncodeToString(v)
case bool: