This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new d056fb5f25 #15507 -- extract tokio runtime creation from hot loop 
(#15508)
d056fb5f25 is described below

commit d056fb5f255923a53ceec395c0ec57e005183c61
Author: Bruce Ritchie <[email protected]>
AuthorDate: Mon Mar 31 20:00:38 2025 -0400

    #15507 -- extract tokio runtime creation from hot loop (#15508)
---
 datafusion/core/benches/aggregate_query_sql.rs  | 19 ++++++++-
 datafusion/core/benches/csv_load.rs             | 11 +++++-
 datafusion/core/benches/dataframe.rs            |  8 ++--
 datafusion/core/benches/distinct_query_sql.rs   |  9 ++++-
 datafusion/core/benches/filter_query_sql.rs     |  9 +++--
 datafusion/core/benches/math_query_sql.rs       | 14 +++----
 datafusion/core/benches/physical_plan.rs        | 10 +++--
 datafusion/core/benches/sort_limit_query_sql.rs | 15 +++----
 datafusion/core/benches/sql_planner.rs          | 52 +++++++++++++++----------
 datafusion/core/benches/struct_query_sql.rs     |  9 ++---
 datafusion/core/benches/window_query_sql.rs     | 16 +++++++-
 11 files changed, 112 insertions(+), 60 deletions(-)

diff --git a/datafusion/core/benches/aggregate_query_sql.rs 
b/datafusion/core/benches/aggregate_query_sql.rs
index ebe94450c1..b29bfc4873 100644
--- a/datafusion/core/benches/aggregate_query_sql.rs
+++ b/datafusion/core/benches/aggregate_query_sql.rs
@@ -29,8 +29,7 @@ use parking_lot::Mutex;
 use std::sync::Arc;
 use tokio::runtime::Runtime;
 
-fn query(ctx: Arc<Mutex<SessionContext>>, sql: &str) {
-    let rt = Runtime::new().unwrap();
+fn query(ctx: Arc<Mutex<SessionContext>>, rt: &Runtime, sql: &str) {
     let df = rt.block_on(ctx.lock().sql(sql)).unwrap();
     criterion::black_box(rt.block_on(df.collect()).unwrap());
 }
@@ -51,11 +50,13 @@ fn criterion_benchmark(c: &mut Criterion) {
     let array_len = 32768 * 2; // 2^16
     let batch_size = 2048; // 2^11
     let ctx = create_context(partitions_len, array_len, batch_size).unwrap();
+    let rt = Runtime::new().unwrap();
 
     c.bench_function("aggregate_query_no_group_by 15 12", |b| {
         b.iter(|| {
             query(
                 ctx.clone(),
+                &rt,
                 "SELECT MIN(f64), AVG(f64), COUNT(f64) \
                  FROM t",
             )
@@ -66,6 +67,7 @@ fn criterion_benchmark(c: &mut Criterion) {
         b.iter(|| {
             query(
                 ctx.clone(),
+                &rt,
                 "SELECT MIN(f64), MAX(f64) \
                  FROM t",
             )
@@ -76,6 +78,7 @@ fn criterion_benchmark(c: &mut Criterion) {
         b.iter(|| {
             query(
                 ctx.clone(),
+                &rt,
                 "SELECT COUNT(DISTINCT u64_wide) \
                  FROM t",
             )
@@ -86,6 +89,7 @@ fn criterion_benchmark(c: &mut Criterion) {
         b.iter(|| {
             query(
                 ctx.clone(),
+                &rt,
                 "SELECT COUNT(DISTINCT u64_narrow) \
                  FROM t",
             )
@@ -96,6 +100,7 @@ fn criterion_benchmark(c: &mut Criterion) {
         b.iter(|| {
             query(
                 ctx.clone(),
+                &rt,
                 "SELECT utf8, MIN(f64), AVG(f64), COUNT(f64) \
                  FROM t GROUP BY utf8",
             )
@@ -106,6 +111,7 @@ fn criterion_benchmark(c: &mut Criterion) {
         b.iter(|| {
             query(
                 ctx.clone(),
+                &rt,
                 "SELECT utf8, MIN(f64), AVG(f64), COUNT(f64) \
                  FROM t \
                  WHERE f32 > 10 AND f32 < 20 GROUP BY utf8",
@@ -117,6 +123,7 @@ fn criterion_benchmark(c: &mut Criterion) {
         b.iter(|| {
             query(
                 ctx.clone(),
+                &rt,
                 "SELECT u64_narrow, MIN(f64), AVG(f64), COUNT(f64) \
                  FROM t GROUP BY u64_narrow",
             )
@@ -127,6 +134,7 @@ fn criterion_benchmark(c: &mut Criterion) {
         b.iter(|| {
             query(
                 ctx.clone(),
+                &rt,
                 "SELECT u64_narrow, MIN(f64), AVG(f64), COUNT(f64) \
                  FROM t \
                  WHERE f32 > 10 AND f32 < 20 GROUP BY u64_narrow",
@@ -138,6 +146,7 @@ fn criterion_benchmark(c: &mut Criterion) {
         b.iter(|| {
             query(
                 ctx.clone(),
+                &rt,
                 "SELECT u64_wide, utf8, MIN(f64), AVG(f64), COUNT(f64) \
                  FROM t GROUP BY u64_wide, utf8",
             )
@@ -148,6 +157,7 @@ fn criterion_benchmark(c: &mut Criterion) {
         b.iter(|| {
             query(
                 ctx.clone(),
+                &rt,
                 "SELECT utf8, approx_percentile_cont(u64_wide, 0.5, 2500)  \
                  FROM t GROUP BY utf8",
             )
@@ -158,6 +168,7 @@ fn criterion_benchmark(c: &mut Criterion) {
         b.iter(|| {
             query(
                 ctx.clone(),
+                &rt,
                 "SELECT utf8, approx_percentile_cont(f32, 0.5, 2500)  \
                  FROM t GROUP BY utf8",
             )
@@ -168,6 +179,7 @@ fn criterion_benchmark(c: &mut Criterion) {
         b.iter(|| {
             query(
                 ctx.clone(),
+                &rt,
                 "SELECT MEDIAN(DISTINCT u64_wide), MEDIAN(DISTINCT u64_narrow) 
\
                  FROM t",
             )
@@ -178,6 +190,7 @@ fn criterion_benchmark(c: &mut Criterion) {
         b.iter(|| {
             query(
                 ctx.clone(),
+                &rt,
                 "SELECT first_value(u64_wide order by f64, u64_narrow, utf8),\
                             last_value(u64_wide order by f64, u64_narrow, 
utf8)  \
                  FROM t GROUP BY u64_narrow",
@@ -189,6 +202,7 @@ fn criterion_benchmark(c: &mut Criterion) {
         b.iter(|| {
             query(
                 ctx.clone(),
+                &rt,
                 "SELECT first_value(u64_wide ignore nulls order by f64, 
u64_narrow, utf8),  \
                             last_value(u64_wide ignore nulls order by f64, 
u64_narrow, utf8)    \
                  FROM t GROUP BY u64_narrow",
@@ -200,6 +214,7 @@ fn criterion_benchmark(c: &mut Criterion) {
         b.iter(|| {
             query(
                 ctx.clone(),
+                &rt,
                 "SELECT first_value(u64_wide order by f64), \
                             last_value(u64_wide order by f64)   \
                 FROM t GROUP BY u64_narrow",
diff --git a/datafusion/core/benches/csv_load.rs 
b/datafusion/core/benches/csv_load.rs
index 2d42121ec9..3f98475746 100644
--- a/datafusion/core/benches/csv_load.rs
+++ b/datafusion/core/benches/csv_load.rs
@@ -32,8 +32,12 @@ use std::time::Duration;
 use test_utils::AccessLogGenerator;
 use tokio::runtime::Runtime;
 
-fn load_csv(ctx: Arc<Mutex<SessionContext>>, path: &str, options: 
CsvReadOptions) {
-    let rt = Runtime::new().unwrap();
+fn load_csv(
+    ctx: Arc<Mutex<SessionContext>>,
+    rt: &Runtime,
+    path: &str,
+    options: CsvReadOptions,
+) {
     let df = rt.block_on(ctx.lock().read_csv(path, options)).unwrap();
     criterion::black_box(rt.block_on(df.collect()).unwrap());
 }
@@ -61,6 +65,7 @@ fn generate_test_file() -> TestCsvFile {
 
 fn criterion_benchmark(c: &mut Criterion) {
     let ctx = create_context().unwrap();
+    let rt = Runtime::new().unwrap();
     let test_file = generate_test_file();
 
     let mut group = c.benchmark_group("load csv testing");
@@ -70,6 +75,7 @@ fn criterion_benchmark(c: &mut Criterion) {
         b.iter(|| {
             load_csv(
                 ctx.clone(),
+                &rt,
                 test_file.path().to_str().unwrap(),
                 CsvReadOptions::default(),
             )
@@ -80,6 +86,7 @@ fn criterion_benchmark(c: &mut Criterion) {
         b.iter(|| {
             load_csv(
                 ctx.clone(),
+                &rt,
                 test_file.path().to_str().unwrap(),
                 
CsvReadOptions::default().null_regex(Some("^NULL$|^$".to_string())),
             )
diff --git a/datafusion/core/benches/dataframe.rs 
b/datafusion/core/benches/dataframe.rs
index 03078e05e1..832553ebed 100644
--- a/datafusion/core/benches/dataframe.rs
+++ b/datafusion/core/benches/dataframe.rs
@@ -44,9 +44,7 @@ fn create_context(field_count: u32) -> 
datafusion_common::Result<Arc<SessionCont
     Ok(Arc::new(ctx))
 }
 
-fn run(column_count: u32, ctx: Arc<SessionContext>) {
-    let rt = Runtime::new().unwrap();
-
+fn run(column_count: u32, ctx: Arc<SessionContext>, rt: &Runtime) {
     criterion::black_box(rt.block_on(async {
         let mut data_frame = ctx.table("t").await.unwrap();
 
@@ -67,11 +65,13 @@ fn run(column_count: u32, ctx: Arc<SessionContext>) {
 }
 
 fn criterion_benchmark(c: &mut Criterion) {
+    let rt = Runtime::new().unwrap();
+
     for column_count in [10, 100, 200, 500] {
         let ctx = create_context(column_count).unwrap();
 
         c.bench_function(&format!("with_column_{column_count}"), |b| {
-            b.iter(|| run(column_count, ctx.clone()))
+            b.iter(|| run(column_count, ctx.clone(), &rt))
         });
     }
 }
diff --git a/datafusion/core/benches/distinct_query_sql.rs 
b/datafusion/core/benches/distinct_query_sql.rs
index ccc6a0e746..4992ae6607 100644
--- a/datafusion/core/benches/distinct_query_sql.rs
+++ b/datafusion/core/benches/distinct_query_sql.rs
@@ -33,8 +33,7 @@ use parking_lot::Mutex;
 use std::{sync::Arc, time::Duration};
 use tokio::runtime::Runtime;
 
-fn query(ctx: Arc<Mutex<SessionContext>>, sql: &str) {
-    let rt = Runtime::new().unwrap();
+fn query(ctx: Arc<Mutex<SessionContext>>, rt: &Runtime, sql: &str) {
     let df = rt.block_on(ctx.lock().sql(sql)).unwrap();
     criterion::black_box(rt.block_on(df.collect()).unwrap());
 }
@@ -55,6 +54,7 @@ fn criterion_benchmark_limited_distinct(c: &mut Criterion) {
     let array_len = 1 << 26; // 64 M
     let batch_size = 8192;
     let ctx = create_context(partitions_len, array_len, batch_size).unwrap();
+    let rt = Runtime::new().unwrap();
 
     let mut group = c.benchmark_group("custom-measurement-time");
     group.measurement_time(Duration::from_secs(40));
@@ -63,6 +63,7 @@ fn criterion_benchmark_limited_distinct(c: &mut Criterion) {
         b.iter(|| {
             query(
                 ctx.clone(),
+                &rt,
                 "SELECT DISTINCT u64_narrow FROM t GROUP BY u64_narrow LIMIT 
10",
             )
         })
@@ -72,6 +73,7 @@ fn criterion_benchmark_limited_distinct(c: &mut Criterion) {
         b.iter(|| {
             query(
                 ctx.clone(),
+                &rt,
                 "SELECT DISTINCT u64_narrow FROM t GROUP BY u64_narrow LIMIT 
100",
             )
         })
@@ -81,6 +83,7 @@ fn criterion_benchmark_limited_distinct(c: &mut Criterion) {
         b.iter(|| {
             query(
                 ctx.clone(),
+                &rt,
                 "SELECT DISTINCT u64_narrow FROM t GROUP BY u64_narrow LIMIT 
1000",
             )
         })
@@ -90,6 +93,7 @@ fn criterion_benchmark_limited_distinct(c: &mut Criterion) {
         b.iter(|| {
             query(
                 ctx.clone(),
+                &rt,
                 "SELECT DISTINCT u64_narrow FROM t GROUP BY u64_narrow LIMIT 
10000",
             )
         })
@@ -99,6 +103,7 @@ fn criterion_benchmark_limited_distinct(c: &mut Criterion) {
         b.iter(|| {
             query(
                 ctx.clone(),
+                &rt,
                 "SELECT u64_narrow, u64_wide, utf8, f64 FROM t GROUP BY 1, 2, 
3, 4 LIMIT 10",
             )
         })
diff --git a/datafusion/core/benches/filter_query_sql.rs 
b/datafusion/core/benches/filter_query_sql.rs
index 0e09ae09d7..c82a160718 100644
--- a/datafusion/core/benches/filter_query_sql.rs
+++ b/datafusion/core/benches/filter_query_sql.rs
@@ -27,9 +27,7 @@ use futures::executor::block_on;
 use std::sync::Arc;
 use tokio::runtime::Runtime;
 
-async fn query(ctx: &SessionContext, sql: &str) {
-    let rt = Runtime::new().unwrap();
-
+async fn query(ctx: &SessionContext, rt: &Runtime, sql: &str) {
     // execute the query
     let df = rt.block_on(ctx.sql(sql)).unwrap();
     criterion::black_box(rt.block_on(df.collect()).unwrap());
@@ -68,10 +66,11 @@ fn create_context(array_len: usize, batch_size: usize) -> 
Result<SessionContext>
 fn criterion_benchmark(c: &mut Criterion) {
     let array_len = 524_288; // 2^19
     let batch_size = 4096; // 2^12
+    let rt = Runtime::new().unwrap();
 
     c.bench_function("filter_array", |b| {
         let ctx = create_context(array_len, batch_size).unwrap();
-        b.iter(|| block_on(query(&ctx, "select f32, f64 from t where f32 >= 
f64")))
+        b.iter(|| block_on(query(&ctx, &rt, "select f32, f64 from t where f32 
>= f64")))
     });
 
     c.bench_function("filter_scalar", |b| {
@@ -79,6 +78,7 @@ fn criterion_benchmark(c: &mut Criterion) {
         b.iter(|| {
             block_on(query(
                 &ctx,
+                &rt,
                 "select f32, f64 from t where f32 >= 250 and f64 > 250",
             ))
         })
@@ -89,6 +89,7 @@ fn criterion_benchmark(c: &mut Criterion) {
         b.iter(|| {
             block_on(query(
                 &ctx,
+                &rt,
                 "select f32, f64 from t where f32 in (10, 20, 30, 40)",
             ))
         })
diff --git a/datafusion/core/benches/math_query_sql.rs 
b/datafusion/core/benches/math_query_sql.rs
index 92c59d5066..76824850c1 100644
--- a/datafusion/core/benches/math_query_sql.rs
+++ b/datafusion/core/benches/math_query_sql.rs
@@ -36,9 +36,7 @@ use datafusion::datasource::MemTable;
 use datafusion::error::Result;
 use datafusion::execution::context::SessionContext;
 
-fn query(ctx: Arc<Mutex<SessionContext>>, sql: &str) {
-    let rt = Runtime::new().unwrap();
-
+fn query(ctx: Arc<Mutex<SessionContext>>, rt: &Runtime, sql: &str) {
     // execute the query
     let df = rt.block_on(ctx.lock().sql(sql)).unwrap();
     rt.block_on(df.collect()).unwrap();
@@ -81,29 +79,31 @@ fn criterion_benchmark(c: &mut Criterion) {
     let array_len = 1048576; // 2^20
     let batch_size = 512; // 2^9
     let ctx = create_context(array_len, batch_size).unwrap();
+    let rt = Runtime::new().unwrap();
+
     c.bench_function("sqrt_20_9", |b| {
-        b.iter(|| query(ctx.clone(), "SELECT sqrt(f32) FROM t"))
+        b.iter(|| query(ctx.clone(), &rt, "SELECT sqrt(f32) FROM t"))
     });
 
     let array_len = 1048576; // 2^20
     let batch_size = 4096; // 2^12
     let ctx = create_context(array_len, batch_size).unwrap();
     c.bench_function("sqrt_20_12", |b| {
-        b.iter(|| query(ctx.clone(), "SELECT sqrt(f32) FROM t"))
+        b.iter(|| query(ctx.clone(), &rt, "SELECT sqrt(f32) FROM t"))
     });
 
     let array_len = 4194304; // 2^22
     let batch_size = 4096; // 2^12
     let ctx = create_context(array_len, batch_size).unwrap();
     c.bench_function("sqrt_22_12", |b| {
-        b.iter(|| query(ctx.clone(), "SELECT sqrt(f32) FROM t"))
+        b.iter(|| query(ctx.clone(), &rt, "SELECT sqrt(f32) FROM t"))
     });
 
     let array_len = 4194304; // 2^22
     let batch_size = 16384; // 2^14
     let ctx = create_context(array_len, batch_size).unwrap();
     c.bench_function("sqrt_22_14", |b| {
-        b.iter(|| query(ctx.clone(), "SELECT sqrt(f32) FROM t"))
+        b.iter(|| query(ctx.clone(), &rt, "SELECT sqrt(f32) FROM t"))
     });
 }
 
diff --git a/datafusion/core/benches/physical_plan.rs 
b/datafusion/core/benches/physical_plan.rs
index aae1457ab9..0a65c52f72 100644
--- a/datafusion/core/benches/physical_plan.rs
+++ b/datafusion/core/benches/physical_plan.rs
@@ -42,6 +42,7 @@ use datafusion_physical_expr_common::sort_expr::LexOrdering;
 // as inputs. All record batches must have the same schema.
 fn sort_preserving_merge_operator(
     session_ctx: Arc<SessionContext>,
+    rt: &Runtime,
     batches: Vec<RecordBatch>,
     sort: &[&str],
 ) {
@@ -63,7 +64,6 @@ fn sort_preserving_merge_operator(
     .unwrap();
     let merge = Arc::new(SortPreservingMergeExec::new(sort, exec));
     let task_ctx = session_ctx.task_ctx();
-    let rt = Runtime::new().unwrap();
     rt.block_on(collect(merge, task_ctx)).unwrap();
 }
 
@@ -166,14 +166,16 @@ fn criterion_benchmark(c: &mut Criterion) {
     ];
 
     let ctx = Arc::new(SessionContext::new());
+    let rt = Runtime::new().unwrap();
+
     for (name, input) in benches {
-        let ctx_clone = ctx.clone();
-        c.bench_function(name, move |b| {
+        c.bench_function(name, |b| {
             b.iter_batched(
                 || input.clone(),
                 |input| {
                     sort_preserving_merge_operator(
-                        ctx_clone.clone(),
+                        ctx.clone(),
+                        &rt,
                         input,
                         &["a", "b", "c", "d"],
                     );
diff --git a/datafusion/core/benches/sort_limit_query_sql.rs 
b/datafusion/core/benches/sort_limit_query_sql.rs
index cfd4b8bc4b..e535a01816 100644
--- a/datafusion/core/benches/sort_limit_query_sql.rs
+++ b/datafusion/core/benches/sort_limit_query_sql.rs
@@ -37,9 +37,7 @@ use datafusion::execution::context::SessionContext;
 
 use tokio::runtime::Runtime;
 
-fn query(ctx: Arc<Mutex<SessionContext>>, sql: &str) {
-    let rt = Runtime::new().unwrap();
-
+fn query(ctx: Arc<Mutex<SessionContext>>, rt: &Runtime, sql: &str) {
     // execute the query
     let df = rt.block_on(ctx.lock().sql(sql)).unwrap();
     rt.block_on(df.collect()).unwrap();
@@ -104,11 +102,14 @@ fn create_context() -> Arc<Mutex<SessionContext>> {
 }
 
 fn criterion_benchmark(c: &mut Criterion) {
+    let ctx = create_context();
+    let rt = Runtime::new().unwrap();
+
     c.bench_function("sort_and_limit_by_int", |b| {
-        let ctx = create_context();
         b.iter(|| {
             query(
                 ctx.clone(),
+                &rt,
                 "SELECT c1, c13, c6, c10 \
                  FROM aggregate_test_100 \
                  ORDER BY c6
@@ -118,10 +119,10 @@ fn criterion_benchmark(c: &mut Criterion) {
     });
 
     c.bench_function("sort_and_limit_by_float", |b| {
-        let ctx = create_context();
         b.iter(|| {
             query(
                 ctx.clone(),
+                &rt,
                 "SELECT c1, c13, c12 \
                  FROM aggregate_test_100 \
                  ORDER BY c13
@@ -131,10 +132,10 @@ fn criterion_benchmark(c: &mut Criterion) {
     });
 
     c.bench_function("sort_and_limit_lex_by_int", |b| {
-        let ctx = create_context();
         b.iter(|| {
             query(
                 ctx.clone(),
+                &rt,
                 "SELECT c1, c13, c6, c10 \
                  FROM aggregate_test_100 \
                  ORDER BY c6 DESC, c10 DESC
@@ -144,10 +145,10 @@ fn criterion_benchmark(c: &mut Criterion) {
     });
 
     c.bench_function("sort_and_limit_lex_by_string", |b| {
-        let ctx = create_context();
         b.iter(|| {
             query(
                 ctx.clone(),
+                &rt,
                 "SELECT c1, c13, c6, c10 \
                  FROM aggregate_test_100 \
                  ORDER BY c1, c13
diff --git a/datafusion/core/benches/sql_planner.rs 
b/datafusion/core/benches/sql_planner.rs
index 2d79778d4d..49cc830d58 100644
--- a/datafusion/core/benches/sql_planner.rs
+++ b/datafusion/core/benches/sql_planner.rs
@@ -45,14 +45,12 @@ const BENCHMARKS_PATH_2: &str = "./benchmarks/";
 const CLICKBENCH_DATA_PATH: &str = "data/hits_partitioned/";
 
 /// Create a logical plan from the specified sql
-fn logical_plan(ctx: &SessionContext, sql: &str) {
-    let rt = Runtime::new().unwrap();
+fn logical_plan(ctx: &SessionContext, rt: &Runtime, sql: &str) {
     criterion::black_box(rt.block_on(ctx.sql(sql)).unwrap());
 }
 
 /// Create a physical ExecutionPlan (by way of logical plan)
-fn physical_plan(ctx: &SessionContext, sql: &str) {
-    let rt = Runtime::new().unwrap();
+fn physical_plan(ctx: &SessionContext, rt: &Runtime, sql: &str) {
     criterion::black_box(rt.block_on(async {
         ctx.sql(sql)
             .await
@@ -104,9 +102,8 @@ fn register_defs(ctx: SessionContext, defs: Vec<TableDef>) 
-> SessionContext {
     ctx
 }
 
-fn register_clickbench_hits_table() -> SessionContext {
+fn register_clickbench_hits_table(rt: &Runtime) -> SessionContext {
     let ctx = SessionContext::new();
-    let rt = Runtime::new().unwrap();
 
     // use an external table for clickbench benchmarks
     let path =
@@ -128,7 +125,11 @@ fn register_clickbench_hits_table() -> SessionContext {
 
 /// Target of this benchmark: control that placeholders replacing does not get 
slower,
 /// if the query does not contain placeholders at all.
-fn benchmark_with_param_values_many_columns(ctx: &SessionContext, b: &mut 
Bencher) {
+fn benchmark_with_param_values_many_columns(
+    ctx: &SessionContext,
+    rt: &Runtime,
+    b: &mut Bencher,
+) {
     const COLUMNS_NUM: usize = 200;
     let mut aggregates = String::new();
     for i in 0..COLUMNS_NUM {
@@ -140,7 +141,6 @@ fn benchmark_with_param_values_many_columns(ctx: 
&SessionContext, b: &mut Benche
     // SELECT max(attr0), ..., max(attrN) FROM t1.
     let query = format!("SELECT {} FROM t1", aggregates);
     let statement = ctx.state().sql_to_statement(&query, "Generic").unwrap();
-    let rt = Runtime::new().unwrap();
     let plan =
         rt.block_on(async { 
ctx.state().statement_to_plan(statement).await.unwrap() });
     b.iter(|| {
@@ -230,33 +230,35 @@ fn criterion_benchmark(c: &mut Criterion) {
     }
 
     let ctx = create_context();
+    let rt = Runtime::new().unwrap();
 
     // Test simplest
     // https://github.com/apache/datafusion/issues/5157
     c.bench_function("logical_select_one_from_700", |b| {
-        b.iter(|| logical_plan(&ctx, "SELECT c1 FROM t700"))
+        b.iter(|| logical_plan(&ctx, &rt, "SELECT c1 FROM t700"))
     });
 
     // Test simplest
     // https://github.com/apache/datafusion/issues/5157
     c.bench_function("physical_select_one_from_700", |b| {
-        b.iter(|| physical_plan(&ctx, "SELECT c1 FROM t700"))
+        b.iter(|| physical_plan(&ctx, &rt, "SELECT c1 FROM t700"))
     });
 
     // Test simplest
     c.bench_function("logical_select_all_from_1000", |b| {
-        b.iter(|| logical_plan(&ctx, "SELECT * FROM t1000"))
+        b.iter(|| logical_plan(&ctx, &rt, "SELECT * FROM t1000"))
     });
 
     // Test simplest
     c.bench_function("physical_select_all_from_1000", |b| {
-        b.iter(|| physical_plan(&ctx, "SELECT * FROM t1000"))
+        b.iter(|| physical_plan(&ctx, &rt, "SELECT * FROM t1000"))
     });
 
     c.bench_function("logical_trivial_join_low_numbered_columns", |b| {
         b.iter(|| {
             logical_plan(
                 &ctx,
+                &rt,
                 "SELECT t1.a2, t2.b2  \
                  FROM t1, t2 WHERE a1 = b1",
             )
@@ -267,6 +269,7 @@ fn criterion_benchmark(c: &mut Criterion) {
         b.iter(|| {
             logical_plan(
                 &ctx,
+                &rt,
                 "SELECT t1.a99, t2.b99  \
                  FROM t1, t2 WHERE a199 = b199",
             )
@@ -277,6 +280,7 @@ fn criterion_benchmark(c: &mut Criterion) {
         b.iter(|| {
             logical_plan(
                 &ctx,
+                &rt,
                 "SELECT t1.a99, MIN(t2.b1), MAX(t2.b199), AVG(t2.b123), 
COUNT(t2.b73)  \
                  FROM t1 JOIN t2 ON t1.a199 = t2.b199 GROUP BY t1.a99",
             )
@@ -293,7 +297,7 @@ fn criterion_benchmark(c: &mut Criterion) {
         }
         let query = format!("SELECT {} FROM t1", aggregates);
         b.iter(|| {
-            physical_plan(&ctx, &query);
+            physical_plan(&ctx, &rt, &query);
         });
     });
 
@@ -302,6 +306,7 @@ fn criterion_benchmark(c: &mut Criterion) {
         b.iter(|| {
             physical_plan(
                 &ctx,
+                &rt,
                 "SELECT t1.a7, t2.b8  \
                  FROM t1, t2 WHERE a7 = b7 \
                  ORDER BY a7",
@@ -313,6 +318,7 @@ fn criterion_benchmark(c: &mut Criterion) {
         b.iter(|| {
             physical_plan(
                 &ctx,
+                &rt,
                 "SELECT t1.a7, t2.b8  \
                  FROM t1, t2 WHERE a7 < b7 \
                  ORDER BY a7",
@@ -324,6 +330,7 @@ fn criterion_benchmark(c: &mut Criterion) {
         b.iter(|| {
             physical_plan(
                 &ctx,
+                &rt,
                 "SELECT ta.a9, tb.a10, tc.a11, td.a12, te.a13, tf.a14 \
                  FROM t1 AS ta, t1 AS tb, t1 AS tc, t1 AS td, t1 AS te, t1 AS 
tf \
                  WHERE ta.a9 = tb.a10 AND tb.a10 = tc.a11 AND tc.a11 = td.a12 
AND \
@@ -336,6 +343,7 @@ fn criterion_benchmark(c: &mut Criterion) {
         b.iter(|| {
             physical_plan(
                 &ctx,
+                &rt,
                 "SELECT t1.a7  \
                  FROM t1 WHERE a7 = (SELECT b8 FROM t2)",
             );
@@ -346,6 +354,7 @@ fn criterion_benchmark(c: &mut Criterion) {
         b.iter(|| {
             physical_plan(
                 &ctx,
+                &rt,
                 "SELECT t1.a7 FROM t1  \
                  INTERSECT SELECT t2.b8 FROM t2",
             );
@@ -356,6 +365,7 @@ fn criterion_benchmark(c: &mut Criterion) {
         b.iter(|| {
             logical_plan(
                 &ctx,
+                &rt,
                 "SELECT DISTINCT t1.a7  \
                  FROM t1, t2 WHERE t1.a7 = t2.b8",
             );
@@ -370,7 +380,7 @@ fn criterion_benchmark(c: &mut Criterion) {
     c.bench_function("physical_sorted_union_orderby", |b| {
         // SELECT ... UNION ALL ...
         let query = union_orderby_query(20);
-        b.iter(|| physical_plan(&ctx, &query))
+        b.iter(|| physical_plan(&ctx, &rt, &query))
     });
 
     // --- TPC-H ---
@@ -393,7 +403,7 @@ fn criterion_benchmark(c: &mut Criterion) {
         let sql =
             
std::fs::read_to_string(format!("{benchmarks_path}queries/{q}.sql")).unwrap();
         c.bench_function(&format!("physical_plan_tpch_{}", q), |b| {
-            b.iter(|| physical_plan(&tpch_ctx, &sql))
+            b.iter(|| physical_plan(&tpch_ctx, &rt, &sql))
         });
     }
 
@@ -407,7 +417,7 @@ fn criterion_benchmark(c: &mut Criterion) {
     c.bench_function("physical_plan_tpch_all", |b| {
         b.iter(|| {
             for sql in &all_tpch_sql_queries {
-                physical_plan(&tpch_ctx, sql)
+                physical_plan(&tpch_ctx, &rt, sql)
             }
         })
     });
@@ -442,7 +452,7 @@ fn criterion_benchmark(c: &mut Criterion) {
     c.bench_function("physical_plan_tpcds_all", |b| {
         b.iter(|| {
             for sql in &all_tpcds_sql_queries {
-                physical_plan(&tpcds_ctx, sql)
+                physical_plan(&tpcds_ctx, &rt, sql)
             }
         })
     });
@@ -468,7 +478,7 @@ fn criterion_benchmark(c: &mut Criterion) {
         .map(|l| l.expect("Could not parse line"))
         .collect_vec();
 
-    let clickbench_ctx = register_clickbench_hits_table();
+    let clickbench_ctx = register_clickbench_hits_table(&rt);
 
     // for (i, sql) in clickbench_queries.iter().enumerate() {
     //     c.bench_function(&format!("logical_plan_clickbench_q{}", i + 1), 
|b| {
@@ -478,7 +488,7 @@ fn criterion_benchmark(c: &mut Criterion) {
 
     for (i, sql) in clickbench_queries.iter().enumerate() {
         c.bench_function(&format!("physical_plan_clickbench_q{}", i + 1), |b| {
-            b.iter(|| physical_plan(&clickbench_ctx, sql))
+            b.iter(|| physical_plan(&clickbench_ctx, &rt, sql))
         });
     }
 
@@ -493,13 +503,13 @@ fn criterion_benchmark(c: &mut Criterion) {
     c.bench_function("physical_plan_clickbench_all", |b| {
         b.iter(|| {
             for sql in &clickbench_queries {
-                physical_plan(&clickbench_ctx, sql)
+                physical_plan(&clickbench_ctx, &rt, sql)
             }
         })
     });
 
     c.bench_function("with_param_values_many_columns", |b| {
-        benchmark_with_param_values_many_columns(&ctx, b);
+        benchmark_with_param_values_many_columns(&ctx, &rt, b);
     });
 }
 
diff --git a/datafusion/core/benches/struct_query_sql.rs 
b/datafusion/core/benches/struct_query_sql.rs
index 3ef7292c66..f9cc43d1ea 100644
--- a/datafusion/core/benches/struct_query_sql.rs
+++ b/datafusion/core/benches/struct_query_sql.rs
@@ -27,9 +27,7 @@ use futures::executor::block_on;
 use std::sync::Arc;
 use tokio::runtime::Runtime;
 
-async fn query(ctx: &SessionContext, sql: &str) {
-    let rt = Runtime::new().unwrap();
-
+async fn query(ctx: &SessionContext, rt: &Runtime, sql: &str) {
     // execute the query
     let df = rt.block_on(ctx.sql(sql)).unwrap();
     criterion::black_box(rt.block_on(df.collect()).unwrap());
@@ -68,10 +66,11 @@ fn create_context(array_len: usize, batch_size: usize) -> 
Result<SessionContext>
 fn criterion_benchmark(c: &mut Criterion) {
     let array_len = 524_288; // 2^19
     let batch_size = 4096; // 2^12
+    let ctx = create_context(array_len, batch_size).unwrap();
+    let rt = Runtime::new().unwrap();
 
     c.bench_function("struct", |b| {
-        let ctx = create_context(array_len, batch_size).unwrap();
-        b.iter(|| block_on(query(&ctx, "select struct(f32, f64) from t")))
+        b.iter(|| block_on(query(&ctx, &rt, "select struct(f32, f64) from t")))
     });
 }
 
diff --git a/datafusion/core/benches/window_query_sql.rs 
b/datafusion/core/benches/window_query_sql.rs
index 42a1e51be3..a55d17a7c5 100644
--- a/datafusion/core/benches/window_query_sql.rs
+++ b/datafusion/core/benches/window_query_sql.rs
@@ -29,8 +29,7 @@ use parking_lot::Mutex;
 use std::sync::Arc;
 use tokio::runtime::Runtime;
 
-fn query(ctx: Arc<Mutex<SessionContext>>, sql: &str) {
-    let rt = Runtime::new().unwrap();
+fn query(ctx: Arc<Mutex<SessionContext>>, rt: &Runtime, sql: &str) {
     let df = rt.block_on(ctx.lock().sql(sql)).unwrap();
     criterion::black_box(rt.block_on(df.collect()).unwrap());
 }
@@ -51,11 +50,13 @@ fn criterion_benchmark(c: &mut Criterion) {
     let array_len = 1024 * 1024;
     let batch_size = 8 * 1024;
     let ctx = create_context(partitions_len, array_len, batch_size).unwrap();
+    let rt = Runtime::new().unwrap();
 
     c.bench_function("window empty over, aggregate functions", |b| {
         b.iter(|| {
             query(
                 ctx.clone(),
+                &rt,
                 "SELECT \
                     MAX(f64) OVER (), \
                     MIN(f32) OVER (), \
@@ -69,6 +70,7 @@ fn criterion_benchmark(c: &mut Criterion) {
         b.iter(|| {
             query(
                 ctx.clone(),
+                &rt,
                 "SELECT \
                     FIRST_VALUE(f64) OVER (), \
                     LAST_VALUE(f32) OVER (), \
@@ -82,6 +84,7 @@ fn criterion_benchmark(c: &mut Criterion) {
         b.iter(|| {
             query(
                 ctx.clone(),
+                &rt,
                 "SELECT \
                     MAX(f64) OVER (ORDER BY u64_narrow), \
                     MIN(f32) OVER (ORDER BY u64_narrow DESC), \
@@ -95,6 +98,7 @@ fn criterion_benchmark(c: &mut Criterion) {
         b.iter(|| {
             query(
                 ctx.clone(),
+                &rt,
                 "SELECT \
                   FIRST_VALUE(f64) OVER (ORDER BY u64_narrow), \
                   LAST_VALUE(f32) OVER (ORDER BY u64_narrow DESC), \
@@ -108,6 +112,7 @@ fn criterion_benchmark(c: &mut Criterion) {
         b.iter(|| {
             query(
                 ctx.clone(),
+                &rt,
                 "SELECT \
                   MAX(f64) OVER (PARTITION BY u64_wide), \
                   MIN(f32) OVER (PARTITION BY u64_wide), \
@@ -123,6 +128,7 @@ fn criterion_benchmark(c: &mut Criterion) {
             b.iter(|| {
                 query(
                     ctx.clone(),
+                    &rt,
                     "SELECT \
                   MAX(f64) OVER (PARTITION BY u64_narrow), \
                   MIN(f32) OVER (PARTITION BY u64_narrow), \
@@ -137,6 +143,7 @@ fn criterion_benchmark(c: &mut Criterion) {
         b.iter(|| {
             query(
                 ctx.clone(),
+                &rt,
                 "SELECT \
                   FIRST_VALUE(f64) OVER (PARTITION BY u64_wide), \
                   LAST_VALUE(f32) OVER (PARTITION BY u64_wide), \
@@ -150,6 +157,7 @@ fn criterion_benchmark(c: &mut Criterion) {
         b.iter(|| {
             query(
                 ctx.clone(),
+                &rt,
                 "SELECT \
                   FIRST_VALUE(f64) OVER (PARTITION BY u64_narrow), \
                   LAST_VALUE(f32) OVER (PARTITION BY u64_narrow), \
@@ -165,6 +173,7 @@ fn criterion_benchmark(c: &mut Criterion) {
             b.iter(|| {
                 query(
                     ctx.clone(),
+                    &rt,
                     "SELECT \
                         MAX(f64) OVER (PARTITION BY u64_wide ORDER by f64), \
                         MIN(f32) OVER (PARTITION BY u64_wide ORDER by f64), \
@@ -181,6 +190,7 @@ fn criterion_benchmark(c: &mut Criterion) {
             b.iter(|| {
                 query(
                     ctx.clone(),
+                    &rt,
                     "SELECT \
                         MAX(f64) OVER (PARTITION BY u64_narrow ORDER by f64), \
                         MIN(f32) OVER (PARTITION BY u64_narrow ORDER by f64), \
@@ -197,6 +207,7 @@ fn criterion_benchmark(c: &mut Criterion) {
             b.iter(|| {
                 query(
                     ctx.clone(),
+                    &rt,
                     "SELECT \
                         FIRST_VALUE(f64) OVER (PARTITION BY u64_wide ORDER by 
f64), \
                         LAST_VALUE(f32) OVER (PARTITION BY u64_wide ORDER by 
f64), \
@@ -213,6 +224,7 @@ fn criterion_benchmark(c: &mut Criterion) {
             b.iter(|| {
                 query(
                     ctx.clone(),
+                    &rt,
                     "SELECT \
                         FIRST_VALUE(f64) OVER (PARTITION BY u64_narrow ORDER 
by f64), \
                         LAST_VALUE(f32) OVER (PARTITION BY u64_narrow ORDER by 
f64), \


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to