This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 8c65a41fe4 Expand sql_planner benchmark for benchmarking physical and
logical optimization. (#17276)
8c65a41fe4 is described below
commit 8c65a41fe42cb0e5ebac9c8a83a794754e5e3545
Author: Bruce Ritchie <[email protected]>
AuthorDate: Sun Aug 24 06:46:34 2025 -0400
Expand sql_planner benchmark for benchmarking physical and logical
optimization. (#17276)
* Updating physical and adding a logical benchmark for large # of columns.
* Removed unsused import.
---
datafusion/core/benches/sql_planner.rs | 225 +++++++++++++++++++++++++++++++--
1 file changed, 212 insertions(+), 13 deletions(-)
diff --git a/datafusion/core/benches/sql_planner.rs
b/datafusion/core/benches/sql_planner.rs
index d02478d2b4..b8413344e4 100644
--- a/datafusion/core/benches/sql_planner.rs
+++ b/datafusion/core/benches/sql_planner.rs
@@ -25,11 +25,18 @@ mod data_utils;
use crate::criterion::Criterion;
use arrow::array::{ArrayRef, RecordBatch};
use arrow::datatypes::{DataType, Field, Fields, Schema};
+use arrow_schema::TimeUnit::Nanosecond;
use criterion::Bencher;
use datafusion::datasource::MemTable;
use datafusion::execution::context::SessionContext;
+use datafusion::prelude::DataFrame;
use datafusion_common::ScalarValue;
-use datafusion_expr::col;
+use datafusion_expr::Expr::Literal;
+use datafusion_expr::{cast, col, lit, not, try_cast, when};
+use datafusion_functions::expr_fn::{
+ btrim, length, regexp_like, regexp_replace, to_timestamp, upper,
+};
+use std::ops::Rem;
use std::path::PathBuf;
use std::sync::Arc;
use test_utils::tpcds::tpcds_schemas;
@@ -58,6 +65,150 @@ fn physical_plan(ctx: &SessionContext, rt: &Runtime, sql:
&str) {
}));
}
+/// Build a dataframe for testing logical plan optimization
+fn build_test_data_frame(ctx: &SessionContext, rt: &Runtime) -> DataFrame {
+ register_string_table(ctx, 100, 1000);
+
+ rt.block_on(async {
+ let mut df = ctx.table("t").await.unwrap();
+ // add some columns in
+ for i in 100..150 {
+ df = df
+ .with_column(&format!("c{i}"),
Literal(ScalarValue::Utf8(None), None))
+ .unwrap();
+ }
+ // add in some columns with string encoded timestamps
+ for i in 150..175 {
+ df = df
+ .with_column(
+ &format!("c{i}"),
+ Literal(ScalarValue::Utf8(Some("2025-08-21
09:43:17".into())), None),
+ )
+ .unwrap();
+ }
+ // do a bunch of ops on the columns
+ for i in 0..175 {
+ // trim the columns
+ df = df
+ .with_column(&format!("c{i}"),
btrim(vec![col(format!("c{i}"))]))
+ .unwrap();
+ }
+
+ for i in 0..175 {
+ let c_name = format!("c{i}");
+ let c = col(&c_name);
+
+ // random ops
+ if i % 5 == 0 && i < 150 {
+ // the actual ops here are largely unimportant as they are
just a sample
+ // of ops that could occur on a dataframe
+ df = df
+ .with_column(&c_name, cast(c.clone(), DataType::Utf8))
+ .unwrap()
+ .with_column(
+ &c_name,
+ when(
+ cast(c.clone(), DataType::Int32).gt(lit(135)),
+ cast(
+ cast(c.clone(), DataType::Int32) - lit(i + 3),
+ DataType::Utf8,
+ ),
+ )
+ .otherwise(c.clone())
+ .unwrap(),
+ )
+ .unwrap()
+ .with_column(
+ &c_name,
+ when(
+ c.clone().is_not_null().and(
+ cast(c.clone(), DataType::Int32)
+ .between(lit(120), lit(130)),
+ ),
+ Literal(ScalarValue::Utf8(None), None),
+ )
+ .otherwise(
+ when(
+ c.clone().is_not_null().and(regexp_like(
+ cast(c.clone(), DataType::Utf8View),
+ lit("[0-9]*"),
+ None,
+ )),
+ upper(c.clone()),
+ )
+ .otherwise(c.clone())
+ .unwrap(),
+ )
+ .unwrap(),
+ )
+ .unwrap()
+ .with_column(
+ &c_name,
+ when(
+ c.clone().is_not_null().and(
+ cast(c.clone(), DataType::Int32)
+ .between(lit(90), lit(100)),
+ ),
+ cast(c.clone(), DataType::Utf8View),
+ )
+ .otherwise(Literal(ScalarValue::Date32(None), None))
+ .unwrap(),
+ )
+ .unwrap()
+ .with_column(
+ &c_name,
+ when(
+ c.clone().is_not_null().and(
+ cast(c.clone(),
DataType::Int32).rem(lit(10)).gt(lit(7)),
+ ),
+ regexp_replace(
+ cast(c.clone(), DataType::Utf8View),
+ lit("1"),
+ lit("a"),
+ None,
+ ),
+ )
+ .otherwise(Literal(ScalarValue::Date32(None), None))
+ .unwrap(),
+ )
+ .unwrap()
+ }
+ if i >= 150 {
+ df = df
+ .with_column(
+ &c_name,
+ try_cast(
+ to_timestamp(vec![c.clone(), lit("%Y-%m-%d
%H:%M:%S")]),
+ DataType::Timestamp(Nanosecond,
Some("UTC".into())),
+ ),
+ )
+ .unwrap()
+ .with_column(&c_name, try_cast(c.clone(),
DataType::Date32))
+ .unwrap()
+ }
+
+ // add in a few unions
+ if i % 30 == 0 {
+ let df1 = df
+ .clone()
+ .filter(length(c.clone()).gt(lit(2)))
+ .unwrap()
+ .with_column(&format!("c{i}_filtered"), lit(true))
+ .unwrap();
+ let df2 = df
+ .filter(not(length(c.clone()).gt(lit(2))))
+ .unwrap()
+ .with_column(&format!("c{i}_filtered"), lit(false))
+ .unwrap();
+
+ df = df1.union_by_name(df2).unwrap()
+ }
+ }
+
+ df
+ })
+}
+
/// Create schema with the specified number of columns
fn create_schema(column_prefix: &str, num_columns: usize) -> Schema {
let fields: Fields = (0..num_columns)
@@ -180,13 +331,40 @@ fn register_union_order_table(ctx: &SessionContext,
num_columns: usize, num_rows
ctx.register_table("t", Arc::new(table)).unwrap();
}
+/// Registers a table like this:
+/// c0,c1,c2...,c99
+/// "0","100"..."9900"
+/// "0","200"..."19800"
+/// "0","300"..."29700"
+fn register_string_table(ctx: &SessionContext, num_columns: usize, num_rows:
usize) {
+ // ("c0", ["0", "0", ...])
+ // ("c1": ["100", "200", ...])
+ // etc
+ let iter = (0..num_columns).map(|i| i as u64).map(|i| {
+ let array: ArrayRef =
Arc::new(arrow::array::StringViewArray::from_iter_values(
+ (0..num_rows)
+ .map(|j| format!("c{}", j as u64 * 100 + i))
+ .collect::<Vec<_>>(),
+ ));
+ (format!("c{i}"), array)
+ });
+ let batch = RecordBatch::try_from_iter(iter).unwrap();
+ let schema = batch.schema();
+ let partitions = vec![vec![batch]];
+
+ // create the table
+ let table = MemTable::try_new(schema, partitions).unwrap();
+
+ ctx.register_table("t", Arc::new(table)).unwrap();
+}
+
/// return a query like
/// ```sql
-/// select c1, null as c2, ... null as cn from t ORDER BY c1
+/// select c1, 2 as c2, ... n as cn from t ORDER BY c1
/// UNION ALL
-/// select null as c1, c2, ... null as cn from t ORDER BY c2
+/// select 1 as c1, c2, ... n as cn from t ORDER BY c2
/// ...
-/// select null as c1, null as c2, ... cn from t ORDER BY cn
+/// select 1 as c1, 2 as c2, ... cn from t ORDER BY cn
/// ORDER BY c1, c2 ... CN
/// ```
fn union_orderby_query(n: usize) -> String {
@@ -200,7 +378,7 @@ fn union_orderby_query(n: usize) -> String {
if i == j {
format!("c{j}")
} else {
- format!("null as c{j}")
+ format!("{j} as c{j}")
}
})
.collect::<Vec<_>>()
@@ -370,16 +548,37 @@ fn criterion_benchmark(c: &mut Criterion) {
});
// -- Sorted Queries --
- register_union_order_table(&ctx, 100, 1000);
-
- // this query has many expressions in its sort order so stresses
- // order equivalence validation
- c.bench_function("physical_sorted_union_orderby", |b| {
- // SELECT ... UNION ALL ...
- let query = union_orderby_query(20);
- b.iter(|| physical_plan(&ctx, &rt, &query))
+ for column_count in [10, 50, 100, 200, 300] {
+ register_union_order_table(&ctx, column_count, 1000);
+
+ // this query has many expressions in its sort order so stresses
+ // order equivalence validation
+ c.bench_function(
+ &format!("physical_sorted_union_order_by_{column_count}"),
+ |b| {
+ // SELECT ... UNION ALL ...
+ let query = union_orderby_query(column_count);
+ b.iter(|| physical_plan(&ctx, &rt, &query))
+ },
+ );
+
+ let _ = ctx.deregister_table("t");
+ }
+
+ // -- validate logical plan optimize performance
+ let df = build_test_data_frame(&ctx, &rt);
+
+ c.bench_function("logical_plan_optimize", |b| {
+ b.iter(|| {
+ let df_clone = df.clone();
+ criterion::black_box(
+ rt.block_on(async { df_clone.into_optimized_plan().unwrap() }),
+ );
+ })
});
+ let _ = ctx.deregister_table("t");
+
// --- TPC-H ---
let tpch_ctx = register_defs(SessionContext::new(), tpch_schemas());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]