This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 8f032ad9 test: benchmark write performance for complex types and many
partitions (#643)
8f032ad9 is described below
commit 8f032ad9b9e58d4ff5cb80fab713b6c6adf2188e
Author: Alex <[email protected]>
AuthorDate: Tue Dec 9 08:36:58 2025 -0700
test: benchmark write performance for complex types and many partitions
(#643)
## Add Comprehensive Partitioned Write Performance Benchmarks
Expands benchmark suite to measure partitioned write performance across
different schema complexities and partition counts.
### Benchmarks Added
**Schema Complexity Tests** - All partitioned by `day(ts)` and `host`
identity:
1. **Simple** - 6 primitive fields (int64, timestamp, string, int32) -
baseline performance
2. **ListPrimitive** - Simple schema + `list<string>` field - tests
nested list handling
3. **ListStruct** - Simple schema + `list<struct<type: string, id:
list<string>>>` field - tests deeply nested structures with lists of
structs containing lists
4. **MapPrimitive** - Simple schema + `map<string, string>` field -
tests map type handling
Each schema tested at 100K, 500K, and 2.5M record counts.
**Partition Scaling Test** - Simple 4-field schema partitioned by single
`partition_key` field, testing 100K records across 25, 100, 250, and
1000 partitions to measure how write performance scales with partition
count.
### Purpose
- Establishes baseline metrics for write performance across different
Arrow/Iceberg schemas
- Measure performance improvement/degradation as libraries evolve.
---
table/partitioned_throughput_bench_test.go | 502 +++++++++++++++++++++++++++--
1 file changed, 469 insertions(+), 33 deletions(-)
diff --git a/table/partitioned_throughput_bench_test.go
b/table/partitioned_throughput_bench_test.go
index 76527236..2ba1a98d 100644
--- a/table/partitioned_throughput_bench_test.go
+++ b/table/partitioned_throughput_bench_test.go
@@ -29,14 +29,103 @@ import (
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/catalog"
"github.com/apache/iceberg-go/table"
+ "github.com/apache/iceberg-go/table/internal"
)
-// BenchmarkPartitionedWriteThroughput benchmarks the full table.Append() path
-func BenchmarkPartitionedWriteThroughput(b *testing.B) {
+// Common benchmark configuration
+var benchSizes = []struct {
+ name string
+ numRecords int
+}{
+ {"100K_records", 100_000},
+ {"500K_records", 500_000},
+ {"2.5M_records", 2_500_000},
+}
+
+// runBenchmark is a helper function to run a benchmark with a given schema
and batch creator
+func runBenchmark(b *testing.B, icebergSchema *iceberg.Schema, arrSchema
*arrow.Schema, createBatch func(int) arrow.RecordBatch) {
ctx := context.Background()
- mem := memory.NewGoAllocator()
- // Define Iceberg schema matching reproducer
+ // Define partition spec (partitioned by day(ts) and host)
+ spec := iceberg.NewPartitionSpecID(1,
+ iceberg.PartitionField{SourceID: 2, FieldID: 1000, Transform:
iceberg.DayTransform{}, Name: "ts_day"},
+ iceberg.PartitionField{SourceID: 3, FieldID: 1001, Transform:
iceberg.IdentityTransform{}, Name: "host"},
+ )
+
+ for _, bs := range benchSizes {
+ b.Run(bs.name, func(b *testing.B) {
+ // Setup warehouse directory and catalog
+ loc := filepath.ToSlash(b.TempDir())
+
+ // Create in-memory SQL catalog
+ cat, err := catalog.Load(ctx, "benchmark",
iceberg.Properties{
+ "type": "sql",
+ "uri": ":memory:",
+ "sql.dialect": "sqlite",
+ "sql.driver": "sqlite",
+ "warehouse": "file://" + loc,
+ })
+ if err != nil {
+ b.Fatalf("Failed to create catalog: %v", err)
+ }
+
+ // Create namespace
+ ns := table.Identifier{"benchmark"}
+ err = cat.CreateNamespace(ctx, ns, iceberg.Properties{})
+ if err != nil {
+ b.Fatalf("Failed to create namespace: %v", err)
+ }
+
+ // Create table with partition spec
+ tableID := table.Identifier{"benchmark",
"partitioned_table"}
+ tbl, err := cat.CreateTable(ctx, tableID, icebergSchema,
+ catalog.WithPartitionSpec(&spec),
+ )
+ if err != nil {
+ b.Fatalf("Failed to create table: %v", err)
+ }
+
+ // Pre-create batch to avoid measuring batch creation
time
+ testBatch := createBatch(bs.numRecords)
+ defer testBatch.Release()
+
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ totalRecords := int64(0)
+ for i := 0; i < b.N; i++ {
+ reader, err := array.NewRecordReader(arrSchema,
[]arrow.RecordBatch{testBatch})
+ if err != nil {
+ b.Fatalf("Failed to create reader: %v",
err)
+ }
+
+ newTable, err := tbl.Append(ctx, reader,
iceberg.Properties{})
+ if err != nil {
+ reader.Release()
+ b.Fatalf("Append error: %v", err)
+ }
+
+ reader.Release()
+ tbl = newTable
+ totalRecords += int64(bs.numRecords)
+ }
+
+ b.StopTimer()
+
+ // Report throughput
+ recordsPerOp := float64(bs.numRecords)
+ recordsPerSec := recordsPerOp / (b.Elapsed().Seconds()
/ float64(b.N))
+ b.ReportMetric(recordsPerSec, "records/sec")
+ b.ReportMetric(float64(totalRecords), "total_records")
+ })
+ }
+}
+
+// BenchmarkPartitionedWriteThroughput_Simple benchmarks with simple primitive
types only
+func BenchmarkPartitionedWriteThroughput_Simple(b *testing.B) {
+ mem := memory.DefaultAllocator
+
+ // Define Iceberg schema with only primitive types
icebergSchema := iceberg.NewSchema(0,
iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
iceberg.NestedField{ID: 2, Name: "ts", Type:
iceberg.PrimitiveTypes.TimestampTz, Required: true},
@@ -56,12 +145,182 @@ func BenchmarkPartitionedWriteThroughput(b *testing.B) {
{Name: "user_agent", Type: arrow.BinaryTypes.String, Nullable:
false},
}, nil)
- // Define partition spec (partitioned by day(ts) and host)
- spec := iceberg.NewPartitionSpecID(1,
- iceberg.PartitionField{SourceID: 2, FieldID: 1000, Transform:
iceberg.DayTransform{}, Name: "ts_day"},
- iceberg.PartitionField{SourceID: 3, FieldID: 1001, Transform:
iceberg.IdentityTransform{}, Name: "host"},
+ // Helper to create a batch of records
+ createBatch := func(numRecords int) arrow.RecordBatch {
+ idB := array.NewInt64Builder(mem)
+ tsB := array.NewTimestampBuilder(mem,
&arrow.TimestampType{Unit: arrow.Microsecond, TimeZone: "UTC"})
+ hostB := array.NewStringBuilder(mem)
+ statusB := array.NewInt32Builder(mem)
+ bytesB := array.NewInt64Builder(mem)
+ uaB := array.NewStringBuilder(mem)
+
+ defer idB.Release()
+ defer tsB.Release()
+ defer hostB.Release()
+ defer statusB.Release()
+ defer bytesB.Release()
+ defer uaB.Release()
+
+ baseTime := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)
+ hosts := []string{"example.com", "foo.test.org", "demo.net"}
+ userAgents := []string{"Mozilla/5.0", "curl/7.68.0",
"python-requests/2.25.1"}
+
+ for i := 0; i < numRecords; i++ {
+ idB.Append(int64(i))
+ ts := baseTime.Add(time.Duration(i%10) * 24 * time.Hour)
+ tsB.Append(arrow.Timestamp(ts.UnixMicro()))
+ hostB.Append(hosts[i%len(hosts)])
+ statusB.Append(200 + int32(i%5))
+ bytesB.Append(int64(1000 + i%5000))
+ uaB.Append(userAgents[i%len(userAgents)])
+ }
+
+ return array.NewRecordBatch(arrSchema, []arrow.Array{
+ idB.NewArray(),
+ tsB.NewArray(),
+ hostB.NewArray(),
+ statusB.NewArray(),
+ bytesB.NewArray(),
+ uaB.NewArray(),
+ }, int64(numRecords))
+ }
+
+ runBenchmark(b, icebergSchema, arrSchema, createBatch)
+}
+
+// BenchmarkPartitionedWriteThroughput_ListPrimitive benchmarks with a list of
primitive types
+func BenchmarkPartitionedWriteThroughput_ListPrimitive(b *testing.B) {
+ mem := memory.DefaultAllocator
+
+ // Define Iceberg schema with list<string>
+ icebergSchema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 2, Name: "ts", Type:
iceberg.PrimitiveTypes.TimestampTz, Required: true},
+ iceberg.NestedField{ID: 3, Name: "host", Type:
iceberg.PrimitiveTypes.String, Required: true},
+ iceberg.NestedField{ID: 4, Name: "status_code", Type:
iceberg.PrimitiveTypes.Int32, Required: true},
+ iceberg.NestedField{ID: 5, Name: "bytes_sent", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 6, Name: "user_agent", Type:
iceberg.PrimitiveTypes.String, Required: true},
+ iceberg.NestedField{ID: 7, Name: "tags", Type:
&iceberg.ListType{
+ ElementID: 8,
+ Element: iceberg.PrimitiveTypes.String,
+ ElementRequired: true,
+ }, Required: false},
+ )
+
+ // Define Arrow schema
+ arrSchema := arrow.NewSchema([]arrow.Field{
+ {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+ {Name: "ts", Type: &arrow.TimestampType{Unit:
arrow.Microsecond, TimeZone: "UTC"}, Nullable: false},
+ {Name: "host", Type: arrow.BinaryTypes.String, Nullable: false},
+ {Name: "status_code", Type: arrow.PrimitiveTypes.Int32,
Nullable: false},
+ {Name: "bytes_sent", Type: arrow.PrimitiveTypes.Int64,
Nullable: false},
+ {Name: "user_agent", Type: arrow.BinaryTypes.String, Nullable:
false},
+ {Name: "tags", Type: arrow.ListOf(arrow.BinaryTypes.String),
Nullable: true},
+ }, nil)
+
+ // Helper to create a batch of records
+ createBatch := func(numRecords int) arrow.RecordBatch {
+ idB := array.NewInt64Builder(mem)
+ tsB := array.NewTimestampBuilder(mem,
&arrow.TimestampType{Unit: arrow.Microsecond, TimeZone: "UTC"})
+ hostB := array.NewStringBuilder(mem)
+ statusB := array.NewInt32Builder(mem)
+ bytesB := array.NewInt64Builder(mem)
+ uaB := array.NewStringBuilder(mem)
+ tagsB := array.NewListBuilder(mem, arrow.BinaryTypes.String)
+ tagsValueB := tagsB.ValueBuilder().(*array.StringBuilder)
+
+ defer idB.Release()
+ defer tsB.Release()
+ defer hostB.Release()
+ defer statusB.Release()
+ defer bytesB.Release()
+ defer uaB.Release()
+ defer tagsB.Release()
+
+ baseTime := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)
+ hosts := []string{"example.com", "foo.test.org", "demo.net"}
+ userAgents := []string{"Mozilla/5.0", "curl/7.68.0",
"python-requests/2.25.1"}
+ tagsList := []string{"tag_0", "tag_1", "tag_2", "tag_3",
"tag_4"}
+
+ for i := 0; i < numRecords; i++ {
+ idB.Append(int64(i))
+ ts := baseTime.Add(time.Duration(i%10) * 24 * time.Hour)
+ tsB.Append(arrow.Timestamp(ts.UnixMicro()))
+ hostB.Append(hosts[i%len(hosts)])
+ statusB.Append(200 + int32(i%5))
+ bytesB.Append(int64(1000 + i%5000))
+ uaB.Append(userAgents[i%len(userAgents)])
+
+ // Add 2-5 tags per record
+ numTags := 2 + (i % 4)
+ tagsB.Append(true)
+ for j := 0; j < numTags; j++ {
+ tagsValueB.Append(tagsList[j%len(tagsList)])
+ }
+ }
+
+ return array.NewRecordBatch(arrSchema, []arrow.Array{
+ idB.NewArray(),
+ tsB.NewArray(),
+ hostB.NewArray(),
+ statusB.NewArray(),
+ bytesB.NewArray(),
+ uaB.NewArray(),
+ tagsB.NewArray(),
+ }, int64(numRecords))
+ }
+
+ runBenchmark(b, icebergSchema, arrSchema, createBatch)
+}
+
+// BenchmarkPartitionedWriteThroughput_ListStruct benchmarks with a list of
structs containing nested lists
+func BenchmarkPartitionedWriteThroughput_ListStruct(b *testing.B) {
+ mem := memory.DefaultAllocator
+
+ // Resource struct type: {type: string, id: list<string>}
+ resourceStruct := &iceberg.StructType{
+ FieldList: []iceberg.NestedField{
+ {ID: 8, Name: "type", Type:
iceberg.PrimitiveTypes.String, Required: true},
+ {ID: 9, Name: "id", Type: &iceberg.ListType{
+ ElementID: 10,
+ Element: iceberg.PrimitiveTypes.String,
+ ElementRequired: true,
+ }, Required: true},
+ },
+ }
+
+ // List of resources: list<struct<type: string, id: list<string>>>
+ resourcesListType := &iceberg.ListType{
+ ElementID: 11,
+ Element: resourceStruct,
+ ElementRequired: true,
+ }
+
+ // Define Iceberg schema with complex nested types
+ icebergSchema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 2, Name: "ts", Type:
iceberg.PrimitiveTypes.TimestampTz, Required: true},
+ iceberg.NestedField{ID: 3, Name: "host", Type:
iceberg.PrimitiveTypes.String, Required: true},
+ iceberg.NestedField{ID: 4, Name: "status_code", Type:
iceberg.PrimitiveTypes.Int32, Required: true},
+ iceberg.NestedField{ID: 5, Name: "bytes_sent", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 6, Name: "user_agent", Type:
iceberg.PrimitiveTypes.String, Required: true},
+ iceberg.NestedField{ID: 7, Name: "resources", Type:
resourcesListType, Required: false},
)
+ // Define Arrow schema
+ arrSchema := arrow.NewSchema([]arrow.Field{
+ {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+ {Name: "ts", Type: &arrow.TimestampType{Unit:
arrow.Microsecond, TimeZone: "UTC"}, Nullable: false},
+ {Name: "host", Type: arrow.BinaryTypes.String, Nullable: false},
+ {Name: "status_code", Type: arrow.PrimitiveTypes.Int32,
Nullable: false},
+ {Name: "bytes_sent", Type: arrow.PrimitiveTypes.Int64,
Nullable: false},
+ {Name: "user_agent", Type: arrow.BinaryTypes.String, Nullable:
false},
+ {Name: "resources", Type: arrow.ListOf(arrow.StructOf(
+ arrow.Field{Name: "type", Type:
arrow.BinaryTypes.String, Nullable: false},
+ arrow.Field{Name: "id", Type:
arrow.ListOf(arrow.BinaryTypes.String), Nullable: false},
+ )), Nullable: true},
+ }, nil)
+
// Helper to create a batch of records
createBatch := func(numRecords int) arrow.RecordBatch {
idB := array.NewInt64Builder(mem)
@@ -71,26 +330,52 @@ func BenchmarkPartitionedWriteThroughput(b *testing.B) {
bytesB := array.NewInt64Builder(mem)
uaB := array.NewStringBuilder(mem)
+ // Build resources: list<struct<type: string, id: list<string>>>
+ resourcesB := array.NewListBuilder(mem, arrow.StructOf(
+ arrow.Field{Name: "type", Type:
arrow.BinaryTypes.String, Nullable: false},
+ arrow.Field{Name: "id", Type:
arrow.ListOf(arrow.BinaryTypes.String), Nullable: false},
+ ))
+ resourceStructB :=
resourcesB.ValueBuilder().(*array.StructBuilder)
+ resourceTypeB :=
resourceStructB.FieldBuilder(0).(*array.StringBuilder)
+ resourceIdListB :=
resourceStructB.FieldBuilder(1).(*array.ListBuilder)
+ resourceIdB :=
resourceIdListB.ValueBuilder().(*array.StringBuilder)
+
defer idB.Release()
defer tsB.Release()
defer hostB.Release()
defer statusB.Release()
defer bytesB.Release()
defer uaB.Release()
+ defer resourcesB.Release()
baseTime := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)
hosts := []string{"example.com", "foo.test.org", "demo.net"}
userAgents := []string{"Mozilla/5.0", "curl/7.68.0",
"python-requests/2.25.1"}
+ resourceTypes := []string{"dataset", "station", "network"}
for i := 0; i < numRecords; i++ {
idB.Append(int64(i))
- // Spread timestamps across multiple days to create
multiple partitions
ts := baseTime.Add(time.Duration(i%10) * 24 * time.Hour)
tsB.Append(arrow.Timestamp(ts.UnixMicro()))
hostB.Append(hosts[i%len(hosts)])
statusB.Append(200 + int32(i%5))
bytesB.Append(int64(1000 + i%5000))
uaB.Append(userAgents[i%len(userAgents)])
+
+ // Add resources - vary between 1-3 resources per record
+ numResources := 1 + (i % 3)
+ resourcesB.Append(true)
+ for j := 0; j < numResources; j++ {
+ resourceStructB.Append(true)
+
resourceTypeB.Append(resourceTypes[j%len(resourceTypes)])
+
+ // Add 2-4 IDs per resource
+ numIds := 2 + (j % 3)
+ resourceIdListB.Append(true)
+ for k := 0; k < numIds; k++ {
+ resourceIdB.Append("id_" +
string(rune('A'+k)))
+ }
+ }
}
return array.NewRecordBatch(arrSchema, []arrow.Array{
@@ -100,45 +385,197 @@ func BenchmarkPartitionedWriteThroughput(b *testing.B) {
statusB.NewArray(),
bytesB.NewArray(),
uaB.NewArray(),
+ resourcesB.NewArray(),
}, int64(numRecords))
}
- // Run benchmarks with different batch sizes
- benchSizes := []struct {
- name string
- numRecords int
+ runBenchmark(b, icebergSchema, arrSchema, createBatch)
+}
+
+// BenchmarkPartitionedWriteThroughput_MapPrimitive benchmarks with a map of
primitive types
+func BenchmarkPartitionedWriteThroughput_MapPrimitive(b *testing.B) {
+ mem := memory.DefaultAllocator
+
+ // Define Iceberg schema with list<string>
+ icebergSchema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 2, Name: "ts", Type:
iceberg.PrimitiveTypes.TimestampTz, Required: true},
+ iceberg.NestedField{ID: 3, Name: "host", Type:
iceberg.PrimitiveTypes.String, Required: true},
+ iceberg.NestedField{ID: 4, Name: "status_code", Type:
iceberg.PrimitiveTypes.Int32, Required: true},
+ iceberg.NestedField{ID: 5, Name: "bytes_sent", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 6, Name: "user_agent", Type:
iceberg.PrimitiveTypes.String, Required: true},
+ iceberg.NestedField{ID: 7, Name: "tags", Type: &iceberg.MapType{
+ KeyID: 8,
+ KeyType: iceberg.PrimitiveTypes.String,
+ ValueID: 9,
+ ValueType: iceberg.PrimitiveTypes.String,
+ ValueRequired: false,
+ }, Required: false},
+ )
+
+ // Define Arrow schema
+ arrSchema := arrow.NewSchema([]arrow.Field{
+ {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+ {Name: "ts", Type: &arrow.TimestampType{Unit:
arrow.Microsecond, TimeZone: "UTC"}, Nullable: false},
+ {Name: "host", Type: arrow.BinaryTypes.String, Nullable: false},
+ {Name: "status_code", Type: arrow.PrimitiveTypes.Int32,
Nullable: false},
+ {Name: "bytes_sent", Type: arrow.PrimitiveTypes.Int64,
Nullable: false},
+ {Name: "user_agent", Type: arrow.BinaryTypes.String, Nullable:
false},
+ {Name: "tags", Type: arrow.MapOf(arrow.BinaryTypes.String,
arrow.BinaryTypes.String), Nullable: true},
+ }, nil)
+
+ // Helper to create a batch of records
+ createBatch := func(numRecords int) arrow.RecordBatch {
+ idB := array.NewInt64Builder(mem)
+ tsB := array.NewTimestampBuilder(mem,
&arrow.TimestampType{Unit: arrow.Microsecond, TimeZone: "UTC"})
+ hostB := array.NewStringBuilder(mem)
+ statusB := array.NewInt32Builder(mem)
+ bytesB := array.NewInt64Builder(mem)
+ uaB := array.NewStringBuilder(mem)
+ tagsB := array.NewMapBuilder(mem, arrow.BinaryTypes.String,
arrow.BinaryTypes.String, false)
+ tagsKeyB := tagsB.KeyBuilder().(*array.StringBuilder)
+ tagsItemB := tagsB.ItemBuilder().(*array.StringBuilder)
+
+ defer idB.Release()
+ defer tsB.Release()
+ defer hostB.Release()
+ defer statusB.Release()
+ defer bytesB.Release()
+ defer uaB.Release()
+ defer tagsB.Release()
+ defer tagsKeyB.Release()
+ defer tagsItemB.Release()
+
+ baseTime := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)
+ hosts := []string{"example.com", "foo.test.org", "demo.net"}
+ userAgents := []string{"Mozilla/5.0", "curl/7.68.0",
"python-requests/2.25.1"}
+ tagsKeys := []string{"tag_0", "tag_1", "tag_2", "tag_3",
"tag_4"}
+ tagsItems := []string{"item_0", "item_1", "item_2", "item_3",
"item_4", "item_5", "item_6", "item_7", "item_8", "item_9"}
+
+ for i := 0; i < numRecords; i++ {
+ idB.Append(int64(i))
+ ts := baseTime.Add(time.Duration(i%10) * 24 * time.Hour)
+ tsB.Append(arrow.Timestamp(ts.UnixMicro()))
+ hostB.Append(hosts[i%len(hosts)])
+ statusB.Append(200 + int32(i%5))
+ bytesB.Append(int64(1000 + i%5000))
+ uaB.Append(userAgents[i%len(userAgents)])
+
+ // Add 2-5 tags per record
+ numTags := 2 + (i % 4)
+ tagsB.Append(true)
+ for j := 0; j < numTags; j++ {
+ tagsKeyB.Append(tagsKeys[j%len(tagsKeys)])
+ tagsItemB.Append(tagsItems[j%len(tagsItems)])
+ }
+ }
+
+ return array.NewRecordBatch(arrSchema, []arrow.Array{
+ idB.NewArray(),
+ tsB.NewArray(),
+ hostB.NewArray(),
+ statusB.NewArray(),
+ bytesB.NewArray(),
+ uaB.NewArray(),
+ tagsB.NewArray(),
+ }, int64(numRecords))
+ }
+
+ runBenchmark(b, icebergSchema, arrSchema, createBatch)
+}
+
+// BenchmarkPartitionedWriteThroughput_PartitionCount tests how write
performance scales with partition count
+func BenchmarkPartitionedWriteThroughput_PartitionCount(b *testing.B) {
+ mem := memory.DefaultAllocator
+ ctx := context.Background()
+
+ // Define Iceberg schema
+ icebergSchema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 2, Name: "ts", Type:
iceberg.PrimitiveTypes.TimestampTz, Required: true},
+ iceberg.NestedField{ID: 3, Name: "partition_key", Type:
iceberg.PrimitiveTypes.Int32, Required: true},
+ iceberg.NestedField{ID: 4, Name: "value", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ )
+
+ // Define Arrow schema
+ arrSchema := arrow.NewSchema([]arrow.Field{
+ {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+ {Name: "ts", Type: &arrow.TimestampType{Unit:
arrow.Microsecond, TimeZone: "UTC"}, Nullable: false},
+ {Name: "partition_key", Type: arrow.PrimitiveTypes.Int32,
Nullable: false},
+ {Name: "value", Type: arrow.PrimitiveTypes.Int64, Nullable:
false},
+ }, nil)
+
+ // Partition spec - only by partition_key
+ spec := iceberg.NewPartitionSpecID(1,
+ iceberg.PartitionField{SourceID: 3, FieldID: 1000, Transform:
iceberg.IdentityTransform{}, Name: "partition_key"},
+ )
+
+ numRecords := 100_000
+
+ partitionTests := []struct {
+ name string
+ partitionCount int
}{
- {"100K_records", 100_000},
- {"500K_records", 500_000},
- {"2.5M_records", 2_500_000},
+ {"25_partitions", 25},
+ {"100_partitions", 100},
+ {"250_partitions", 250},
+ {"1000_partitions", 1000},
}
- for _, bs := range benchSizes {
- b.Run(bs.name, func(b *testing.B) {
- // Setup warehouse directory and catalog
+ for _, pt := range partitionTests {
+ b.Run(pt.name, func(b *testing.B) {
+ // Helper to create a batch with specified partition
distribution
+ createBatch := func(numRecords, numPartitions int)
arrow.RecordBatch {
+ idB := array.NewInt64Builder(mem)
+ tsB := array.NewTimestampBuilder(mem,
&arrow.TimestampType{Unit: arrow.Microsecond, TimeZone: "UTC"})
+ partKeyB := array.NewInt32Builder(mem)
+ valueB := array.NewInt64Builder(mem)
+
+ defer idB.Release()
+ defer tsB.Release()
+ defer partKeyB.Release()
+ defer valueB.Release()
+
+ baseTime := time.Date(2024, 1, 1, 0, 0, 0, 0,
time.UTC)
+
+ for i := 0; i < numRecords; i++ {
+ idB.Append(int64(i))
+
tsB.Append(arrow.Timestamp(baseTime.Add(time.Duration(i) *
time.Second).UnixMicro()))
+ partKeyB.Append(int32(i %
numPartitions))
+ valueB.Append(int64(i * 1000))
+ }
+
+ return array.NewRecordBatch(arrSchema,
[]arrow.Array{
+ idB.NewArray(),
+ tsB.NewArray(),
+ partKeyB.NewArray(),
+ valueB.NewArray(),
+ }, int64(numRecords))
+ }
+
+ // Setup
loc := filepath.ToSlash(b.TempDir())
- // Create in-memory SQL catalog
cat, err := catalog.Load(ctx, "benchmark",
iceberg.Properties{
- "type": "sql",
- "uri": ":memory:",
- "sql.dialect": "sqlite",
- "sql.driver": "sqlite",
- "warehouse": "file://" + loc,
+ "type": "sql",
+ "uri": ":memory:",
+ "sql.dialect": "sqlite",
+ "sql.driver": "sqlite",
+ "warehouse": "file://"
+ loc,
+ internal.ParquetCompressionKey: "zstd",
+ internal.ParquetCompressionLevelKey: "3",
})
if err != nil {
b.Fatalf("Failed to create catalog: %v", err)
}
- // Create namespace
ns := table.Identifier{"benchmark"}
err = cat.CreateNamespace(ctx, ns, iceberg.Properties{})
if err != nil {
b.Fatalf("Failed to create namespace: %v", err)
}
- // Create table with partition spec
- tableID := table.Identifier{"benchmark",
"partitioned_table"}
+ tableID := table.Identifier{"benchmark",
"partition_scale_test"}
tbl, err := cat.CreateTable(ctx, tableID, icebergSchema,
catalog.WithPartitionSpec(&spec),
)
@@ -146,8 +583,7 @@ func BenchmarkPartitionedWriteThroughput(b *testing.B) {
b.Fatalf("Failed to create table: %v", err)
}
- // Pre-create batch to avoid measuring batch creation
time
- testBatch := createBatch(bs.numRecords)
+ testBatch := createBatch(numRecords, pt.partitionCount)
defer testBatch.Release()
b.ResetTimer()
@@ -168,15 +604,15 @@ func BenchmarkPartitionedWriteThroughput(b *testing.B) {
reader.Release()
tbl = newTable
- totalRecords += int64(bs.numRecords)
+ totalRecords += int64(numRecords)
}
b.StopTimer()
- // Report throughput
- recordsPerOp := float64(bs.numRecords)
+ recordsPerOp := float64(numRecords)
recordsPerSec := recordsPerOp / (b.Elapsed().Seconds()
/ float64(b.N))
b.ReportMetric(recordsPerSec, "records/sec")
+ b.ReportMetric(float64(pt.partitionCount), "partitions")
b.ReportMetric(float64(totalRecords), "total_records")
})
}