2010YOUY01 commented on code in PR #16512: URL: https://github.com/apache/datafusion/pull/16512#discussion_r2168056666
########## datafusion/physical-plan/benches/spill_io.rs: ########## @@ -119,5 +126,414 @@ fn bench_spill_io(c: &mut Criterion) { group.finish(); } -criterion_group!(benches, bench_spill_io); +// Generate `num_batches` RecordBatches mimicking TPC-H Q2's partial aggregate result: +// GROUP BY ps_partkey -> MIN(ps_supplycost) +fn create_q2_like_batches(num_batches: usize) -> (Arc<Schema>, Vec<RecordBatch>) { + // use fixed seed + let seed = 2; + let mut rng = rand::rngs::StdRng::seed_from_u64(seed); + let mut batches = Vec::with_capacity(num_batches); + + let mut current_key = 400000_i64; + + let schema = Arc::new(Schema::new(vec![ + Field::new("ps_partkey", DataType::Int64, false), + Field::new("min_ps_supplycost", DataType::Decimal128(15, 2), true), + ])); + + for _ in 0..num_batches { + let mut partkey_builder = Int64Builder::new(); + let mut cost_builder = Decimal128Builder::new() + .with_precision_and_scale(15, 2) + .unwrap(); + + for _ in 0..8192 { Review Comment: I think we should make this `8192` a global variable or function arg ########## datafusion/physical-plan/benches/spill_io.rs: ########## @@ -119,5 +126,414 @@ fn bench_spill_io(c: &mut Criterion) { group.finish(); } -criterion_group!(benches, bench_spill_io); +// Generate `num_batches` RecordBatches mimicking TPC-H Q2's partial aggregate result: +// GROUP BY ps_partkey -> MIN(ps_supplycost) +fn create_q2_like_batches(num_batches: usize) -> (Arc<Schema>, Vec<RecordBatch>) { + // use fixed seed + let seed = 2; + let mut rng = rand::rngs::StdRng::seed_from_u64(seed); + let mut batches = Vec::with_capacity(num_batches); + + let mut current_key = 400000_i64; + + let schema = Arc::new(Schema::new(vec![ + Field::new("ps_partkey", DataType::Int64, false), + Field::new("min_ps_supplycost", DataType::Decimal128(15, 2), true), + ])); + + for _ in 0..num_batches { + let mut partkey_builder = Int64Builder::new(); + let mut cost_builder = Decimal128Builder::new() + .with_precision_and_scale(15, 2) + .unwrap(); + + for _ in 0..8192 { + // Occasionally skip a few partkey values to simulate sparsity + let jump = if rng.random_bool(0.05) { + rng.random_range(2..10) + } else { + 1 + }; + current_key += jump; + + let supply_cost = rng.random_range(10_00..100_000) as i128; + + partkey_builder.append_value(current_key); + cost_builder.append_value(supply_cost); + } + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(partkey_builder.finish()), + Arc::new(cost_builder.finish()), + ], + ) + .unwrap(); + + batches.push(batch); + } + + (schema, batches) +} + +/// Generate `num_batches` RecordBatches mimicking TPC-H Q16's partial aggregate result: +/// GROUP BY (p_brand, p_type, p_size) -> COUNT(DISTINCT ps_suppkey) +pub fn create_q16_like_batches(num_batches: usize) -> (Arc<Schema>, Vec<RecordBatch>) { + let seed = 16; + let mut rng = rand::rngs::StdRng::seed_from_u64(seed); + let mut batches = Vec::with_capacity(num_batches); + + let schema = Arc::new(Schema::new(vec![ + Field::new("p_brand", DataType::Utf8, false), + Field::new("p_type", DataType::Utf8, false), + Field::new("p_size", DataType::Int32, false), + Field::new("alias1", DataType::Int64, false), // COUNT(DISTINCT ps_suppkey) + ])); + + // Representative string pools + let brands = ["Brand#32", "Brand#33", "Brand#41", "Brand#42", "Brand#55"]; + let types = [ + "PROMO ANODIZED NICKEL", + "STANDARD BRUSHED NICKEL", + "PROMO POLISHED COPPER", + "ECONOMY ANODIZED BRASS", + "LARGE BURNISHED COPPER", + "STANDARD POLISHED TIN", + "SMALL PLATED STEEL", + "MEDIUM POLISHED COPPER", + ]; + let sizes = [3, 9, 14, 19, 23, 36, 45, 49]; + + for _ in 0..num_batches { + let mut brand_builder = StringBuilder::new(); + let mut type_builder = StringBuilder::new(); + let mut size_builder = Int32Builder::new(); + let mut count_builder = Int64Builder::new(); + + for _ in 0..8192 { + let brand = brands[rng.random_range(0..brands.len())]; + let ptype = types[rng.random_range(0..types.len())]; + let size = sizes[rng.random_range(0..sizes.len())]; + let count = rng.random_range(1000..100_000); + + brand_builder.append_value(brand); + type_builder.append_value(ptype); + size_builder.append_value(size); + count_builder.append_value(count); + } + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(brand_builder.finish()), + Arc::new(type_builder.finish()), + Arc::new(size_builder.finish()), + Arc::new(count_builder.finish()), + ], + ) + .unwrap(); + + batches.push(batch); + } + + (schema, batches) +} + +// Generate `num_batches` RecordBatches mimicking TPC-H Q20's partial aggregate result: +// GROUP BY (l_partkey, l_suppkey) -> SUM(l_quantity) +fn create_q20_like_batches(num_batches: usize) -> (Arc<Schema>, Vec<RecordBatch>) { + let seed = 20; + let mut rng = rand::rngs::StdRng::seed_from_u64(seed); + let mut batches = Vec::with_capacity(num_batches); + + let mut current_partkey = 400000_i64; + + let schema = Arc::new(Schema::new(vec![ + Field::new("l_partkey", DataType::Int64, false), + Field::new("l_suppkey", DataType::Int64, false), + Field::new("sum_l_quantity", DataType::Decimal128(25, 2), true), + ])); + + for _ in 0..num_batches { + let mut partkey_builder = Int64Builder::new(); + let mut suppkey_builder = Int64Builder::new(); + let mut quantity_builder = Decimal128Builder::new() + .with_precision_and_scale(25, 2) + .unwrap(); + + for _ in 0..8192 { + // Occasionally skip a few partkey values to simulate sparsity + let partkey_jump = if rng.random_bool(0.03) { + rng.random_range(2..6) + } else { + 1 + }; + current_partkey += partkey_jump; + + let suppkey = rng.random_range(10_000..99_999); + let quantity = rng.random_range(500..20_000) as i128; + + partkey_builder.append_value(current_partkey); + suppkey_builder.append_value(suppkey); + quantity_builder.append_value(quantity); + } + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(partkey_builder.finish()), + Arc::new(suppkey_builder.finish()), + Arc::new(quantity_builder.finish()), + ], + ) + .unwrap(); + + batches.push(batch); + } + + (schema, batches) +} + +/// Genereate `num_batches` wide RecordBatches resembling sort-tpch Q10 for benchmarking. +/// This includes multiple numeric, date, and Utf8View columns (15 total). +pub fn create_wide_batches(num_batches: usize) -> (Arc<Schema>, Vec<RecordBatch>) { + let seed = 10; + let mut rng = rand::rngs::StdRng::seed_from_u64(seed); + let mut batches = Vec::with_capacity(num_batches); + + let schema = Arc::new(Schema::new(vec![ + Field::new("l_linenumber", DataType::Int32, false), + Field::new("l_suppkey", DataType::Int64, false), + Field::new("l_orderkey", DataType::Int64, false), + Field::new("l_partkey", DataType::Int64, false), + Field::new("l_quantity", DataType::Decimal128(15, 2), false), + Field::new("l_extendedprice", DataType::Decimal128(15, 2), false), + Field::new("l_discount", DataType::Decimal128(15, 2), false), + Field::new("l_tax", DataType::Decimal128(15, 2), false), + Field::new("l_returnflag", DataType::Utf8, false), + Field::new("l_linestatus", DataType::Utf8, false), + Field::new("l_shipdate", DataType::Date32, false), + Field::new("l_commitdate", DataType::Date32, false), + Field::new("l_receiptdate", DataType::Date32, false), + Field::new("l_shipinstruct", DataType::Utf8, false), + Field::new("l_shipmode", DataType::Utf8, false), + ])); + + for _ in 0..num_batches { + let mut linenum = Int32Builder::new(); + let mut suppkey = Int64Builder::new(); + let mut orderkey = Int64Builder::new(); + let mut partkey = Int64Builder::new(); + let mut quantity = Decimal128Builder::new() + .with_precision_and_scale(15, 2) + .unwrap(); + let mut extprice = Decimal128Builder::new() + .with_precision_and_scale(15, 2) + .unwrap(); + let mut discount = Decimal128Builder::new() + .with_precision_and_scale(15, 2) + .unwrap(); + let mut tax = Decimal128Builder::new() + .with_precision_and_scale(15, 2) + .unwrap(); + let mut retflag = StringBuilder::new(); + let mut linestatus = StringBuilder::new(); + let mut shipdate = Date32Builder::new(); + let mut commitdate = Date32Builder::new(); + let mut receiptdate = Date32Builder::new(); + let mut shipinstruct = StringBuilder::new(); + let mut shipmode = StringBuilder::new(); + + let return_flags = ["A", "N", "R"]; + let statuses = ["F", "O"]; + let instructs = ["DELIVER IN PERSON", "COLLECT COD", "NONE"]; + let modes = ["TRUCK", "MAIL", "SHIP", "RAIL", "AIR"]; + + for i in 0..8192 { + linenum.append_value(i % 7); + suppkey.append_value(rng.random_range(0..100_000)); + orderkey.append_value(1_000_000 + i as i64); + partkey.append_value(rng.random_range(0..200_000)); + + quantity.append_value(rng.random_range(100..10000) as i128); + extprice.append_value(rng.random_range(1_000..1_000_000) as i128); + discount.append_value(rng.random_range(0..10000) as i128); + tax.append_value(rng.random_range(0..5000) as i128); + + retflag.append_value(return_flags[rng.random_range(0..return_flags.len())]); + linestatus.append_value(statuses[rng.random_range(0..statuses.len())]); + + let base_date = 10_000; + shipdate.append_value(base_date + (i % 1000)); + commitdate.append_value(base_date + (i % 1000) + 1); + receiptdate.append_value(base_date + (i % 1000) + 2); + + shipinstruct.append_value(instructs[rng.random_range(0..instructs.len())]); + shipmode.append_value(modes[rng.random_range(0..modes.len())]); + } + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(linenum.finish()), + Arc::new(suppkey.finish()), + Arc::new(orderkey.finish()), + Arc::new(partkey.finish()), + Arc::new(quantity.finish()), + Arc::new(extprice.finish()), + Arc::new(discount.finish()), + Arc::new(tax.finish()), + Arc::new(retflag.finish()), + Arc::new(linestatus.finish()), + Arc::new(shipdate.finish()), + Arc::new(commitdate.finish()), + Arc::new(receiptdate.finish()), + Arc::new(shipinstruct.finish()), + Arc::new(shipmode.finish()), + ], + ) + .unwrap(); + batches.push(batch); + } + (schema, batches) +} + +// Benchmarks spill write + read performance across multiple compression codecs +// using realistic input data inspired by TPC-H aggregate spill scenarios. +// +// This function prepares synthetic RecordBatches that mimic the schema and distribution +// of intermediate aggregate results from representative TPC-H queries (Q2, Q16, Q20) and sort-tpch Q10 +// For each dataset: +// - It evaluates spill performance under different compression codecs (e.g., Uncompressed, Zstd, LZ4). +// - It measures end-to-end spill write + read performance using Criterion. +// - It prints the observed memory-to-disk compression ratio for each codec. +// +// This helps evaluate the tradeoffs between compression ratio and runtime overhead for various codecs. +fn bench_spill_compression(c: &mut Criterion) { + let env = Arc::new(RuntimeEnv::default()); + let mut group = c.benchmark_group("spill_compression"); + let rt = Runtime::new().unwrap(); + let compressions = vec![ + SpillCompression::Uncompressed, + SpillCompression::Zstd, + SpillCompression::Lz4Frame, + ]; + + // Modify this value to change data volume. Note that each batch contains 8192 rows. + let num_batches = 50; + + // Q2 [Int64, Decimal128] + let (schema, batches) = create_q2_like_batches(50); Review Comment: ```suggestion let (schema, batches) = create_q2_like_batches(num_batches); ``` and same for the below two functions ########## datafusion/physical-plan/benches/spill_io.rs: ########## @@ -119,5 +126,414 @@ fn bench_spill_io(c: &mut Criterion) { group.finish(); } -criterion_group!(benches, bench_spill_io); +// Generate `num_batches` RecordBatches mimicking TPC-H Q2's partial aggregate result: +// GROUP BY ps_partkey -> MIN(ps_supplycost) +fn create_q2_like_batches(num_batches: usize) -> (Arc<Schema>, Vec<RecordBatch>) { + // use fixed seed + let seed = 2; + let mut rng = rand::rngs::StdRng::seed_from_u64(seed); + let mut batches = Vec::with_capacity(num_batches); + + let mut current_key = 400000_i64; + + let schema = Arc::new(Schema::new(vec![ + Field::new("ps_partkey", DataType::Int64, false), + Field::new("min_ps_supplycost", DataType::Decimal128(15, 2), true), + ])); + + for _ in 0..num_batches { + let mut partkey_builder = Int64Builder::new(); + let mut cost_builder = Decimal128Builder::new() + .with_precision_and_scale(15, 2) + .unwrap(); + + for _ in 0..8192 { + // Occasionally skip a few partkey values to simulate sparsity + let jump = if rng.random_bool(0.05) { + rng.random_range(2..10) + } else { + 1 + }; + current_key += jump; + + let supply_cost = rng.random_range(10_00..100_000) as i128; + + partkey_builder.append_value(current_key); + cost_builder.append_value(supply_cost); + } + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(partkey_builder.finish()), + Arc::new(cost_builder.finish()), + ], + ) + .unwrap(); + + batches.push(batch); + } + + (schema, batches) +} + +/// Generate `num_batches` RecordBatches mimicking TPC-H Q16's partial aggregate result: +/// GROUP BY (p_brand, p_type, p_size) -> COUNT(DISTINCT ps_suppkey) +pub fn create_q16_like_batches(num_batches: usize) -> (Arc<Schema>, Vec<RecordBatch>) { + let seed = 16; + let mut rng = rand::rngs::StdRng::seed_from_u64(seed); + let mut batches = Vec::with_capacity(num_batches); + + let schema = Arc::new(Schema::new(vec![ + Field::new("p_brand", DataType::Utf8, false), + Field::new("p_type", DataType::Utf8, false), + Field::new("p_size", DataType::Int32, false), + Field::new("alias1", DataType::Int64, false), // COUNT(DISTINCT ps_suppkey) + ])); + + // Representative string pools + let brands = ["Brand#32", "Brand#33", "Brand#41", "Brand#42", "Brand#55"]; + let types = [ + "PROMO ANODIZED NICKEL", + "STANDARD BRUSHED NICKEL", + "PROMO POLISHED COPPER", + "ECONOMY ANODIZED BRASS", + "LARGE BURNISHED COPPER", + "STANDARD POLISHED TIN", + "SMALL PLATED STEEL", + "MEDIUM POLISHED COPPER", + ]; + let sizes = [3, 9, 14, 19, 23, 36, 45, 49]; + + for _ in 0..num_batches { + let mut brand_builder = StringBuilder::new(); + let mut type_builder = StringBuilder::new(); + let mut size_builder = Int32Builder::new(); + let mut count_builder = Int64Builder::new(); + + for _ in 0..8192 { + let brand = brands[rng.random_range(0..brands.len())]; + let ptype = types[rng.random_range(0..types.len())]; + let size = sizes[rng.random_range(0..sizes.len())]; + let count = rng.random_range(1000..100_000); + + brand_builder.append_value(brand); + type_builder.append_value(ptype); + size_builder.append_value(size); + count_builder.append_value(count); + } + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(brand_builder.finish()), + Arc::new(type_builder.finish()), + Arc::new(size_builder.finish()), + Arc::new(count_builder.finish()), + ], + ) + .unwrap(); + + batches.push(batch); + } + + (schema, batches) +} + +// Generate `num_batches` RecordBatches mimicking TPC-H Q20's partial aggregate result: +// GROUP BY (l_partkey, l_suppkey) -> SUM(l_quantity) +fn create_q20_like_batches(num_batches: usize) -> (Arc<Schema>, Vec<RecordBatch>) { + let seed = 20; + let mut rng = rand::rngs::StdRng::seed_from_u64(seed); + let mut batches = Vec::with_capacity(num_batches); + + let mut current_partkey = 400000_i64; + + let schema = Arc::new(Schema::new(vec![ + Field::new("l_partkey", DataType::Int64, false), + Field::new("l_suppkey", DataType::Int64, false), + Field::new("sum_l_quantity", DataType::Decimal128(25, 2), true), + ])); + + for _ in 0..num_batches { + let mut partkey_builder = Int64Builder::new(); + let mut suppkey_builder = Int64Builder::new(); + let mut quantity_builder = Decimal128Builder::new() + .with_precision_and_scale(25, 2) + .unwrap(); + + for _ in 0..8192 { + // Occasionally skip a few partkey values to simulate sparsity + let partkey_jump = if rng.random_bool(0.03) { + rng.random_range(2..6) + } else { + 1 + }; + current_partkey += partkey_jump; + + let suppkey = rng.random_range(10_000..99_999); + let quantity = rng.random_range(500..20_000) as i128; + + partkey_builder.append_value(current_partkey); + suppkey_builder.append_value(suppkey); + quantity_builder.append_value(quantity); + } + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(partkey_builder.finish()), + Arc::new(suppkey_builder.finish()), + Arc::new(quantity_builder.finish()), + ], + ) + .unwrap(); + + batches.push(batch); + } + + (schema, batches) +} + +/// Genereate `num_batches` wide RecordBatches resembling sort-tpch Q10 for benchmarking. +/// This includes multiple numeric, date, and Utf8View columns (15 total). +pub fn create_wide_batches(num_batches: usize) -> (Arc<Schema>, Vec<RecordBatch>) { + let seed = 10; + let mut rng = rand::rngs::StdRng::seed_from_u64(seed); + let mut batches = Vec::with_capacity(num_batches); + + let schema = Arc::new(Schema::new(vec![ + Field::new("l_linenumber", DataType::Int32, false), + Field::new("l_suppkey", DataType::Int64, false), + Field::new("l_orderkey", DataType::Int64, false), + Field::new("l_partkey", DataType::Int64, false), + Field::new("l_quantity", DataType::Decimal128(15, 2), false), + Field::new("l_extendedprice", DataType::Decimal128(15, 2), false), + Field::new("l_discount", DataType::Decimal128(15, 2), false), + Field::new("l_tax", DataType::Decimal128(15, 2), false), + Field::new("l_returnflag", DataType::Utf8, false), + Field::new("l_linestatus", DataType::Utf8, false), + Field::new("l_shipdate", DataType::Date32, false), + Field::new("l_commitdate", DataType::Date32, false), + Field::new("l_receiptdate", DataType::Date32, false), + Field::new("l_shipinstruct", DataType::Utf8, false), + Field::new("l_shipmode", DataType::Utf8, false), + ])); + + for _ in 0..num_batches { + let mut linenum = Int32Builder::new(); + let mut suppkey = Int64Builder::new(); + let mut orderkey = Int64Builder::new(); + let mut partkey = Int64Builder::new(); + let mut quantity = Decimal128Builder::new() + .with_precision_and_scale(15, 2) + .unwrap(); + let mut extprice = Decimal128Builder::new() + .with_precision_and_scale(15, 2) + .unwrap(); + let mut discount = Decimal128Builder::new() + .with_precision_and_scale(15, 2) + .unwrap(); + let mut tax = Decimal128Builder::new() + .with_precision_and_scale(15, 2) + .unwrap(); + let mut retflag = StringBuilder::new(); + let mut linestatus = StringBuilder::new(); + let mut shipdate = Date32Builder::new(); + let mut commitdate = Date32Builder::new(); + let mut receiptdate = Date32Builder::new(); + let mut shipinstruct = StringBuilder::new(); + let mut shipmode = StringBuilder::new(); + + let return_flags = ["A", "N", "R"]; + let statuses = ["F", "O"]; + let instructs = ["DELIVER IN PERSON", "COLLECT COD", "NONE"]; + let modes = ["TRUCK", "MAIL", "SHIP", "RAIL", "AIR"]; + + for i in 0..8192 { + linenum.append_value(i % 7); + suppkey.append_value(rng.random_range(0..100_000)); + orderkey.append_value(1_000_000 + i as i64); + partkey.append_value(rng.random_range(0..200_000)); + + quantity.append_value(rng.random_range(100..10000) as i128); + extprice.append_value(rng.random_range(1_000..1_000_000) as i128); + discount.append_value(rng.random_range(0..10000) as i128); + tax.append_value(rng.random_range(0..5000) as i128); + + retflag.append_value(return_flags[rng.random_range(0..return_flags.len())]); + linestatus.append_value(statuses[rng.random_range(0..statuses.len())]); + + let base_date = 10_000; + shipdate.append_value(base_date + (i % 1000)); + commitdate.append_value(base_date + (i % 1000) + 1); + receiptdate.append_value(base_date + (i % 1000) + 2); + + shipinstruct.append_value(instructs[rng.random_range(0..instructs.len())]); + shipmode.append_value(modes[rng.random_range(0..modes.len())]); + } + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(linenum.finish()), + Arc::new(suppkey.finish()), + Arc::new(orderkey.finish()), + Arc::new(partkey.finish()), + Arc::new(quantity.finish()), + Arc::new(extprice.finish()), + Arc::new(discount.finish()), + Arc::new(tax.finish()), + Arc::new(retflag.finish()), + Arc::new(linestatus.finish()), + Arc::new(shipdate.finish()), + Arc::new(commitdate.finish()), + Arc::new(receiptdate.finish()), + Arc::new(shipinstruct.finish()), + Arc::new(shipmode.finish()), + ], + ) + .unwrap(); + batches.push(batch); + } + (schema, batches) +} + +// Benchmarks spill write + read performance across multiple compression codecs +// using realistic input data inspired by TPC-H aggregate spill scenarios. +// +// This function prepares synthetic RecordBatches that mimic the schema and distribution +// of intermediate aggregate results from representative TPC-H queries (Q2, Q16, Q20) and sort-tpch Q10 Review Comment: Maybe we can also copy the schemas of each workload in implementation comments to here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org