This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new bde9803e42 FIX : some benchmarks are failing (#15367)
bde9803e42 is described below
commit bde9803e424dead6672aebe590403e8615dee216
Author: Namgung Chan <[email protected]>
AuthorDate: Tue Apr 1 03:28:33 2025 +0900
FIX : some benchmarks are failing (#15367)
* distinct_query_sql, topk_aggregate
* cargo clippy
* cargo fmt
* share runtime
---
datafusion/core/benches/distinct_query_sql.rs | 56 ++++++++-------------
datafusion/core/benches/topk_aggregate.rs | 70 ++++++++++++++++-----------
2 files changed, 63 insertions(+), 63 deletions(-)
diff --git a/datafusion/core/benches/distinct_query_sql.rs
b/datafusion/core/benches/distinct_query_sql.rs
index c242798a56..ccc6a0e746 100644
--- a/datafusion/core/benches/distinct_query_sql.rs
+++ b/datafusion/core/benches/distinct_query_sql.rs
@@ -118,12 +118,9 @@ async fn distinct_with_limit(
Ok(())
}
-fn run(plan: Arc<dyn ExecutionPlan>, ctx: Arc<TaskContext>) {
- let rt = Runtime::new().unwrap();
- criterion::black_box(
- rt.block_on(async { distinct_with_limit(plan.clone(),
ctx.clone()).await }),
- )
- .unwrap();
+fn run(rt: &Runtime, plan: Arc<dyn ExecutionPlan>, ctx: Arc<TaskContext>) {
+ criterion::black_box(rt.block_on(distinct_with_limit(plan.clone(),
ctx.clone())))
+ .unwrap();
}
pub async fn create_context_sampled_data(
@@ -145,58 +142,47 @@ pub async fn create_context_sampled_data(
fn criterion_benchmark_limited_distinct_sampled(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
-
let limit = 10;
let partitions = 100;
let samples = 100_000;
let sql =
format!("select DISTINCT trace_id from traces group by trace_id limit
{limit};");
-
- let distinct_trace_id_100_partitions_100_000_samples_limit_100 =
rt.block_on(async {
- create_context_sampled_data(sql.as_str(), partitions, samples)
- .await
- .unwrap()
- });
-
c.bench_function(
format!("distinct query with {} partitions and {} samples per
partition with limit {}", partitions, samples, limit).as_str(),
- |b| b.iter(||
run(distinct_trace_id_100_partitions_100_000_samples_limit_100.0.clone(),
-
distinct_trace_id_100_partitions_100_000_samples_limit_100.1.clone())),
+ |b| b.iter(|| {
+ let (plan, ctx) = rt.block_on(
+ create_context_sampled_data(sql.as_str(), partitions, samples)
+ ).unwrap();
+ run(&rt, plan.clone(), ctx.clone())
+ }),
);
let partitions = 10;
let samples = 1_000_000;
let sql =
format!("select DISTINCT trace_id from traces group by trace_id limit
{limit};");
-
- let distinct_trace_id_10_partitions_1_000_000_samples_limit_10 =
rt.block_on(async {
- create_context_sampled_data(sql.as_str(), partitions, samples)
- .await
- .unwrap()
- });
-
c.bench_function(
format!("distinct query with {} partitions and {} samples per
partition with limit {}", partitions, samples, limit).as_str(),
- |b| b.iter(||
run(distinct_trace_id_10_partitions_1_000_000_samples_limit_10.0.clone(),
-
distinct_trace_id_10_partitions_1_000_000_samples_limit_10.1.clone())),
+ |b| b.iter(|| {
+ let (plan, ctx) = rt.block_on(
+ create_context_sampled_data(sql.as_str(), partitions, samples)
+ ).unwrap();
+ run(&rt, plan.clone(), ctx.clone())
+ }),
);
let partitions = 1;
let samples = 10_000_000;
let sql =
format!("select DISTINCT trace_id from traces group by trace_id limit
{limit};");
-
- let rt = Runtime::new().unwrap();
- let distinct_trace_id_1_partition_10_000_000_samples_limit_10 =
rt.block_on(async {
- create_context_sampled_data(sql.as_str(), partitions, samples)
- .await
- .unwrap()
- });
-
c.bench_function(
format!("distinct query with {} partitions and {} samples per
partition with limit {}", partitions, samples, limit).as_str(),
- |b| b.iter(||
run(distinct_trace_id_1_partition_10_000_000_samples_limit_10.0.clone(),
-
distinct_trace_id_1_partition_10_000_000_samples_limit_10.1.clone())),
+ |b| b.iter(|| {
+ let (plan, ctx) = rt.block_on(
+ create_context_sampled_data(sql.as_str(), partitions, samples)
+ ).unwrap();
+ run(&rt, plan.clone(), ctx.clone())
+ }),
);
}
diff --git a/datafusion/core/benches/topk_aggregate.rs
b/datafusion/core/benches/topk_aggregate.rs
index 922cbd2b42..777d586b34 100644
--- a/datafusion/core/benches/topk_aggregate.rs
+++ b/datafusion/core/benches/topk_aggregate.rs
@@ -55,8 +55,7 @@ async fn create_context(
Ok((physical_plan, ctx.task_ctx()))
}
-fn run(plan: Arc<dyn ExecutionPlan>, ctx: Arc<TaskContext>, asc: bool) {
- let rt = Runtime::new().unwrap();
+fn run(rt: &Runtime, plan: Arc<dyn ExecutionPlan>, ctx: Arc<TaskContext>, asc:
bool) {
criterion::black_box(
rt.block_on(async { aggregate(plan.clone(), ctx.clone(), asc).await }),
)
@@ -99,40 +98,37 @@ async fn aggregate(
}
fn criterion_benchmark(c: &mut Criterion) {
+ let rt = Runtime::new().unwrap();
let limit = 10;
let partitions = 10;
let samples = 1_000_000;
- let rt = Runtime::new().unwrap();
- let topk_real = rt.block_on(async {
- create_context(limit, partitions, samples, false, true)
- .await
- .unwrap()
- });
- let topk_asc = rt.block_on(async {
- create_context(limit, partitions, samples, true, true)
- .await
- .unwrap()
- });
- let real = rt.block_on(async {
- create_context(limit, partitions, samples, false, false)
- .await
- .unwrap()
- });
- let asc = rt.block_on(async {
- create_context(limit, partitions, samples, true, false)
- .await
- .unwrap()
- });
-
c.bench_function(
format!("aggregate {} time-series rows", partitions *
samples).as_str(),
- |b| b.iter(|| run(real.0.clone(), real.1.clone(), false)),
+ |b| {
+ b.iter(|| {
+ let real = rt.block_on(async {
+ create_context(limit, partitions, samples, false, false)
+ .await
+ .unwrap()
+ });
+ run(&rt, real.0.clone(), real.1.clone(), false)
+ })
+ },
);
c.bench_function(
format!("aggregate {} worst-case rows", partitions * samples).as_str(),
- |b| b.iter(|| run(asc.0.clone(), asc.1.clone(), true)),
+ |b| {
+ b.iter(|| {
+ let asc = rt.block_on(async {
+ create_context(limit, partitions, samples, true, false)
+ .await
+ .unwrap()
+ });
+ run(&rt, asc.0.clone(), asc.1.clone(), true)
+ })
+ },
);
c.bench_function(
@@ -141,7 +137,16 @@ fn criterion_benchmark(c: &mut Criterion) {
partitions * samples
)
.as_str(),
- |b| b.iter(|| run(topk_real.0.clone(), topk_real.1.clone(), false)),
+ |b| {
+ b.iter(|| {
+ let topk_real = rt.block_on(async {
+ create_context(limit, partitions, samples, false, true)
+ .await
+ .unwrap()
+ });
+ run(&rt, topk_real.0.clone(), topk_real.1.clone(), false)
+ })
+ },
);
c.bench_function(
@@ -150,7 +155,16 @@ fn criterion_benchmark(c: &mut Criterion) {
partitions * samples
)
.as_str(),
- |b| b.iter(|| run(topk_asc.0.clone(), topk_asc.1.clone(), true)),
+ |b| {
+ b.iter(|| {
+ let topk_asc = rt.block_on(async {
+ create_context(limit, partitions, samples, true, true)
+ .await
+ .unwrap()
+ });
+ run(&rt, topk_asc.0.clone(), topk_asc.1.clone(), true)
+ })
+ },
);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]