This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new bde9803e42 FIX : some benchmarks are failing (#15367)
bde9803e42 is described below

commit bde9803e424dead6672aebe590403e8615dee216
Author: Namgung Chan <[email protected]>
AuthorDate: Tue Apr 1 03:28:33 2025 +0900

    FIX : some benchmarks are failing (#15367)
    
    * distinct_query_sql, topk_aggregate
    
    * cargo clippy
    
    * cargo fmt
    
    * share runtime
---
 datafusion/core/benches/distinct_query_sql.rs | 56 ++++++++-------------
 datafusion/core/benches/topk_aggregate.rs     | 70 ++++++++++++++++-----------
 2 files changed, 63 insertions(+), 63 deletions(-)

diff --git a/datafusion/core/benches/distinct_query_sql.rs 
b/datafusion/core/benches/distinct_query_sql.rs
index c242798a56..ccc6a0e746 100644
--- a/datafusion/core/benches/distinct_query_sql.rs
+++ b/datafusion/core/benches/distinct_query_sql.rs
@@ -118,12 +118,9 @@ async fn distinct_with_limit(
     Ok(())
 }
 
-fn run(plan: Arc<dyn ExecutionPlan>, ctx: Arc<TaskContext>) {
-    let rt = Runtime::new().unwrap();
-    criterion::black_box(
-        rt.block_on(async { distinct_with_limit(plan.clone(), 
ctx.clone()).await }),
-    )
-    .unwrap();
+fn run(rt: &Runtime, plan: Arc<dyn ExecutionPlan>, ctx: Arc<TaskContext>) {
+    criterion::black_box(rt.block_on(distinct_with_limit(plan.clone(), 
ctx.clone())))
+        .unwrap();
 }
 
 pub async fn create_context_sampled_data(
@@ -145,58 +142,47 @@ pub async fn create_context_sampled_data(
 
 fn criterion_benchmark_limited_distinct_sampled(c: &mut Criterion) {
     let rt = Runtime::new().unwrap();
-
     let limit = 10;
     let partitions = 100;
     let samples = 100_000;
     let sql =
         format!("select DISTINCT trace_id from traces group by trace_id limit 
{limit};");
-
-    let distinct_trace_id_100_partitions_100_000_samples_limit_100 = 
rt.block_on(async {
-        create_context_sampled_data(sql.as_str(), partitions, samples)
-            .await
-            .unwrap()
-    });
-
     c.bench_function(
         format!("distinct query with {} partitions and {} samples per 
partition with limit {}", partitions, samples, limit).as_str(),
-        |b| b.iter(|| 
run(distinct_trace_id_100_partitions_100_000_samples_limit_100.0.clone(),
-                                   
distinct_trace_id_100_partitions_100_000_samples_limit_100.1.clone())),
+        |b| b.iter(|| {
+            let (plan, ctx) = rt.block_on(
+                create_context_sampled_data(sql.as_str(), partitions, samples)
+            ).unwrap();
+            run(&rt, plan.clone(), ctx.clone())
+        }),
     );
 
     let partitions = 10;
     let samples = 1_000_000;
     let sql =
         format!("select DISTINCT trace_id from traces group by trace_id limit 
{limit};");
-
-    let distinct_trace_id_10_partitions_1_000_000_samples_limit_10 = 
rt.block_on(async {
-        create_context_sampled_data(sql.as_str(), partitions, samples)
-            .await
-            .unwrap()
-    });
-
     c.bench_function(
         format!("distinct query with {} partitions and {} samples per 
partition with limit {}", partitions, samples, limit).as_str(),
-        |b| b.iter(|| 
run(distinct_trace_id_10_partitions_1_000_000_samples_limit_10.0.clone(),
-                                   
distinct_trace_id_10_partitions_1_000_000_samples_limit_10.1.clone())),
+        |b| b.iter(|| {
+            let (plan, ctx) = rt.block_on(
+                create_context_sampled_data(sql.as_str(), partitions, samples)
+            ).unwrap();
+            run(&rt, plan.clone(), ctx.clone())
+        }),
     );
 
     let partitions = 1;
     let samples = 10_000_000;
     let sql =
         format!("select DISTINCT trace_id from traces group by trace_id limit 
{limit};");
-
-    let rt = Runtime::new().unwrap();
-    let distinct_trace_id_1_partition_10_000_000_samples_limit_10 = 
rt.block_on(async {
-        create_context_sampled_data(sql.as_str(), partitions, samples)
-            .await
-            .unwrap()
-    });
-
     c.bench_function(
         format!("distinct query with {} partitions and {} samples per 
partition with limit {}", partitions, samples, limit).as_str(),
-        |b| b.iter(|| 
run(distinct_trace_id_1_partition_10_000_000_samples_limit_10.0.clone(),
-                                   
distinct_trace_id_1_partition_10_000_000_samples_limit_10.1.clone())),
+        |b| b.iter(|| {
+            let (plan, ctx) = rt.block_on(
+                create_context_sampled_data(sql.as_str(), partitions, samples)
+            ).unwrap();
+            run(&rt, plan.clone(), ctx.clone())
+        }),
     );
 }
 
diff --git a/datafusion/core/benches/topk_aggregate.rs 
b/datafusion/core/benches/topk_aggregate.rs
index 922cbd2b42..777d586b34 100644
--- a/datafusion/core/benches/topk_aggregate.rs
+++ b/datafusion/core/benches/topk_aggregate.rs
@@ -55,8 +55,7 @@ async fn create_context(
     Ok((physical_plan, ctx.task_ctx()))
 }
 
-fn run(plan: Arc<dyn ExecutionPlan>, ctx: Arc<TaskContext>, asc: bool) {
-    let rt = Runtime::new().unwrap();
+fn run(rt: &Runtime, plan: Arc<dyn ExecutionPlan>, ctx: Arc<TaskContext>, asc: 
bool) {
     criterion::black_box(
         rt.block_on(async { aggregate(plan.clone(), ctx.clone(), asc).await }),
     )
@@ -99,40 +98,37 @@ async fn aggregate(
 }
 
 fn criterion_benchmark(c: &mut Criterion) {
+    let rt = Runtime::new().unwrap();
     let limit = 10;
     let partitions = 10;
     let samples = 1_000_000;
 
-    let rt = Runtime::new().unwrap();
-    let topk_real = rt.block_on(async {
-        create_context(limit, partitions, samples, false, true)
-            .await
-            .unwrap()
-    });
-    let topk_asc = rt.block_on(async {
-        create_context(limit, partitions, samples, true, true)
-            .await
-            .unwrap()
-    });
-    let real = rt.block_on(async {
-        create_context(limit, partitions, samples, false, false)
-            .await
-            .unwrap()
-    });
-    let asc = rt.block_on(async {
-        create_context(limit, partitions, samples, true, false)
-            .await
-            .unwrap()
-    });
-
     c.bench_function(
         format!("aggregate {} time-series rows", partitions * 
samples).as_str(),
-        |b| b.iter(|| run(real.0.clone(), real.1.clone(), false)),
+        |b| {
+            b.iter(|| {
+                let real = rt.block_on(async {
+                    create_context(limit, partitions, samples, false, false)
+                        .await
+                        .unwrap()
+                });
+                run(&rt, real.0.clone(), real.1.clone(), false)
+            })
+        },
     );
 
     c.bench_function(
         format!("aggregate {} worst-case rows", partitions * samples).as_str(),
-        |b| b.iter(|| run(asc.0.clone(), asc.1.clone(), true)),
+        |b| {
+            b.iter(|| {
+                let asc = rt.block_on(async {
+                    create_context(limit, partitions, samples, true, false)
+                        .await
+                        .unwrap()
+                });
+                run(&rt, asc.0.clone(), asc.1.clone(), true)
+            })
+        },
     );
 
     c.bench_function(
@@ -141,7 +137,16 @@ fn criterion_benchmark(c: &mut Criterion) {
             partitions * samples
         )
         .as_str(),
-        |b| b.iter(|| run(topk_real.0.clone(), topk_real.1.clone(), false)),
+        |b| {
+            b.iter(|| {
+                let topk_real = rt.block_on(async {
+                    create_context(limit, partitions, samples, false, true)
+                        .await
+                        .unwrap()
+                });
+                run(&rt, topk_real.0.clone(), topk_real.1.clone(), false)
+            })
+        },
     );
 
     c.bench_function(
@@ -150,7 +155,16 @@ fn criterion_benchmark(c: &mut Criterion) {
             partitions * samples
         )
         .as_str(),
-        |b| b.iter(|| run(topk_asc.0.clone(), topk_asc.1.clone(), true)),
+        |b| {
+            b.iter(|| {
+                let topk_asc = rt.block_on(async {
+                    create_context(limit, partitions, samples, true, true)
+                        .await
+                        .unwrap()
+                });
+                run(&rt, topk_asc.0.clone(), topk_asc.1.clone(), true)
+            })
+        },
     );
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to