This is an automated email from the ASF dual-hosted git repository.

jayzhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new ac74cd3163 Minor: Add `RuntimeEnvBuilder::build_arc() (#12213)
ac74cd3163 is described below

commit ac74cd3163e43563807a8c6e8e72bb058cb6f459
Author: Andrew Lamb <[email protected]>
AuthorDate: Mon Sep 2 09:02:42 2024 -0400

    Minor: Add `RuntimeEnvBuilder::build_arc() (#12213)
---
 datafusion/core/src/execution/context/mod.rs       |  6 ++--
 datafusion/core/tests/fuzz_cases/sort_fuzz.rs      | 10 +++---
 datafusion/core/tests/memory_limit/mod.rs          | 11 +++---
 datafusion/core/tests/parquet/file_statistics.rs   | 11 +++---
 datafusion/execution/src/runtime_env.rs            |  5 +++
 datafusion/execution/src/task.rs                   |  4 +--
 datafusion/physical-plan/src/aggregates/mod.rs     | 18 ++++------
 datafusion/physical-plan/src/joins/cross_join.rs   |  8 ++---
 datafusion/physical-plan/src/joins/hash_join.rs    | 16 ++++-----
 .../physical-plan/src/joins/nested_loop_join.rs    |  8 ++---
 .../physical-plan/src/joins/sort_merge_join.rs     | 40 +++++++++-------------
 datafusion/physical-plan/src/repartition/mod.rs    |  8 ++---
 datafusion/physical-plan/src/sorts/sort.rs         | 22 +++++-------
 datafusion/wasmtest/src/lib.rs                     | 10 +++---
 14 files changed, 75 insertions(+), 102 deletions(-)

diff --git a/datafusion/core/src/execution/context/mod.rs 
b/datafusion/core/src/execution/context/mod.rs
index c67424c0fa..06dc797ae2 100644
--- a/datafusion/core/src/execution/context/mod.rs
+++ b/datafusion/core/src/execution/context/mod.rs
@@ -219,13 +219,13 @@ where
 /// // configure a memory limit of 1GB with 20%  slop
 ///  let runtime_env = RuntimeEnvBuilder::new()
 ///     .with_memory_limit(1024 * 1024 * 1024, 0.80)
-///     .build()
+///     .build_arc()
 ///     .unwrap();
 ///
 /// // Create a SessionState using the config and runtime_env
 /// let state = SessionStateBuilder::new()
 ///   .with_config(config)
-///   .with_runtime_env(Arc::new(runtime_env))
+///   .with_runtime_env(runtime_env)
 ///   // include support for built in functions and configurations
 ///   .with_default_features()
 ///   .build();
@@ -1758,7 +1758,7 @@ mod tests {
         let path = path.join("tests/tpch-csv");
         let url = format!("file://{}", path.display());
 
-        let runtime = Arc::new(RuntimeEnvBuilder::new().build()?);
+        let runtime = RuntimeEnvBuilder::new().build_arc()?;
         let cfg = SessionConfig::new()
             .set_str("datafusion.catalog.location", url.as_str())
             .set_str("datafusion.catalog.format", "CSV")
diff --git a/datafusion/core/tests/fuzz_cases/sort_fuzz.rs 
b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs
index 1980589491..fae4731569 100644
--- a/datafusion/core/tests/fuzz_cases/sort_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs
@@ -136,12 +136,10 @@ impl SortTest {
                     .sort_spill_reservation_bytes,
             );
 
-            let runtime = Arc::new(
-                RuntimeEnvBuilder::new()
-                    
.with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size)))
-                    .build()
-                    .unwrap(),
-            );
+            let runtime = RuntimeEnvBuilder::new()
+                .with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size)))
+                .build_arc()
+                .unwrap();
             SessionContext::new_with_config_rt(session_config, runtime)
         } else {
             SessionContext::new_with_config(session_config)
diff --git a/datafusion/core/tests/memory_limit/mod.rs 
b/datafusion/core/tests/memory_limit/mod.rs
index 592c25dedc..69ef6058a2 100644
--- a/datafusion/core/tests/memory_limit/mod.rs
+++ b/datafusion/core/tests/memory_limit/mod.rs
@@ -509,21 +509,20 @@ impl TestCase {
 
         let table = scenario.table();
 
-        let rt_config = RuntimeEnvBuilder::new()
+        let mut builder = RuntimeEnvBuilder::new()
             // disk manager setting controls the spilling
             .with_disk_manager(disk_manager_config)
             .with_memory_limit(memory_limit, MEMORY_FRACTION);
 
-        let runtime = if let Some(pool) = memory_pool {
-            rt_config.with_memory_pool(pool).build().unwrap()
-        } else {
-            rt_config.build().unwrap()
+        if let Some(pool) = memory_pool {
+            builder = builder.with_memory_pool(pool);
         };
+        let runtime = builder.build_arc().unwrap();
 
         // Configure execution
         let builder = SessionStateBuilder::new()
             .with_config(config)
-            .with_runtime_env(Arc::new(runtime))
+            .with_runtime_env(runtime)
             .with_default_features();
         let builder = match scenario.rules() {
             Some(rules) => builder.with_physical_optimizer_rules(rules),
diff --git a/datafusion/core/tests/parquet/file_statistics.rs 
b/datafusion/core/tests/parquet/file_statistics.rs
index bd251f1a66..cd62c3bf42 100644
--- a/datafusion/core/tests/parquet/file_statistics.rs
+++ b/datafusion/core/tests/parquet/file_statistics.rs
@@ -197,12 +197,11 @@ fn get_cache_runtime_state() -> (
         .with_files_statistics_cache(Some(file_static_cache.clone()))
         .with_list_files_cache(Some(list_file_cache.clone()));
 
-    let rt = Arc::new(
-        RuntimeEnvBuilder::new()
-            .with_cache_manager(cache_config)
-            .build()
-            .expect("could not build runtime environment"),
-    );
+    let rt = RuntimeEnvBuilder::new()
+        .with_cache_manager(cache_config)
+        .build_arc()
+        .expect("could not build runtime environment");
+
     let state = SessionContext::new_with_config_rt(SessionConfig::default(), 
rt).state();
 
     (file_static_cache, list_file_cache, state)
diff --git a/datafusion/execution/src/runtime_env.rs 
b/datafusion/execution/src/runtime_env.rs
index e7b48be95c..574d387ae6 100644
--- a/datafusion/execution/src/runtime_env.rs
+++ b/datafusion/execution/src/runtime_env.rs
@@ -246,4 +246,9 @@ impl RuntimeEnvBuilder {
             object_store_registry: self.object_store_registry,
         })
     }
+
+    /// Convenience method to create a new `Arc<RuntimeEnv>`
+    pub fn build_arc(self) -> Result<Arc<RuntimeEnv>> {
+        self.build().map(Arc::new)
+    }
 }
diff --git a/datafusion/execution/src/task.rs b/datafusion/execution/src/task.rs
index 35689b8e08..57fcac0ee5 100644
--- a/datafusion/execution/src/task.rs
+++ b/datafusion/execution/src/task.rs
@@ -58,7 +58,7 @@ pub struct TaskContext {
 impl Default for TaskContext {
     fn default() -> Self {
         let runtime = RuntimeEnvBuilder::new()
-            .build()
+            .build_arc()
             .expect("default runtime created successfully");
 
         // Create a default task context, mostly useful for testing
@@ -69,7 +69,7 @@ impl Default for TaskContext {
             scalar_functions: HashMap::new(),
             aggregate_functions: HashMap::new(),
             window_functions: HashMap::new(),
-            runtime: Arc::new(runtime),
+            runtime,
         }
     }
 }
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs 
b/datafusion/physical-plan/src/aggregates/mod.rs
index e66a57fd2e..764227e5e7 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -1331,12 +1331,10 @@ mod tests {
 
     fn new_spill_ctx(batch_size: usize, max_memory: usize) -> Arc<TaskContext> 
{
         let session_config = SessionConfig::new().with_batch_size(batch_size);
-        let runtime = Arc::new(
-            RuntimeEnvBuilder::default()
-                .with_memory_pool(Arc::new(FairSpillPool::new(max_memory)))
-                .build()
-                .unwrap(),
-        );
+        let runtime = RuntimeEnvBuilder::default()
+            .with_memory_pool(Arc::new(FairSpillPool::new(max_memory)))
+            .build_arc()
+            .unwrap();
         let task_ctx = TaskContext::default()
             .with_session_config(session_config)
             .with_runtime(runtime);
@@ -1815,11 +1813,9 @@ mod tests {
         let input: Arc<dyn ExecutionPlan> = 
Arc::new(TestYieldingExec::new(true));
         let input_schema = input.schema();
 
-        let runtime = Arc::new(
-            RuntimeEnvBuilder::default()
-                .with_memory_limit(1, 1.0)
-                .build()?,
-        );
+        let runtime = RuntimeEnvBuilder::default()
+            .with_memory_limit(1, 1.0)
+            .build_arc()?;
         let task_ctx = TaskContext::default().with_runtime(runtime);
         let task_ctx = Arc::new(task_ctx);
 
diff --git a/datafusion/physical-plan/src/joins/cross_join.rs 
b/datafusion/physical-plan/src/joins/cross_join.rs
index b99d4f17c4..11153556f2 100644
--- a/datafusion/physical-plan/src/joins/cross_join.rs
+++ b/datafusion/physical-plan/src/joins/cross_join.rs
@@ -673,11 +673,9 @@ mod tests {
 
     #[tokio::test]
     async fn test_overallocation() -> Result<()> {
-        let runtime = Arc::new(
-            RuntimeEnvBuilder::new()
-                .with_memory_limit(100, 1.0)
-                .build()?,
-        );
+        let runtime = RuntimeEnvBuilder::new()
+            .with_memory_limit(100, 1.0)
+            .build_arc()?;
         let task_ctx = TaskContext::default().with_runtime(runtime);
         let task_ctx = Arc::new(task_ctx);
 
diff --git a/datafusion/physical-plan/src/joins/hash_join.rs 
b/datafusion/physical-plan/src/joins/hash_join.rs
index f20d00e1a2..38827108e8 100644
--- a/datafusion/physical-plan/src/joins/hash_join.rs
+++ b/datafusion/physical-plan/src/joins/hash_join.rs
@@ -3798,11 +3798,9 @@ mod tests {
         ];
 
         for join_type in join_types {
-            let runtime = Arc::new(
-                RuntimeEnvBuilder::new()
-                    .with_memory_limit(100, 1.0)
-                    .build()?,
-            );
+            let runtime = RuntimeEnvBuilder::new()
+                .with_memory_limit(100, 1.0)
+                .build_arc()?;
             let task_ctx = TaskContext::default().with_runtime(runtime);
             let task_ctx = Arc::new(task_ctx);
 
@@ -3874,11 +3872,9 @@ mod tests {
         ];
 
         for join_type in join_types {
-            let runtime = Arc::new(
-                RuntimeEnvBuilder::new()
-                    .with_memory_limit(100, 1.0)
-                    .build()?,
-            );
+            let runtime = RuntimeEnvBuilder::new()
+                .with_memory_limit(100, 1.0)
+                .build_arc()?;
             let session_config = SessionConfig::default().with_batch_size(50);
             let task_ctx = TaskContext::default()
                 .with_session_config(session_config)
diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs 
b/datafusion/physical-plan/src/joins/nested_loop_join.rs
index 3cd3735441..dadd20714e 100644
--- a/datafusion/physical-plan/src/joins/nested_loop_join.rs
+++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs
@@ -1019,11 +1019,9 @@ mod tests {
         ];
 
         for join_type in join_types {
-            let runtime = Arc::new(
-                RuntimeEnvBuilder::new()
-                    .with_memory_limit(100, 1.0)
-                    .build()?,
-            );
+            let runtime = RuntimeEnvBuilder::new()
+                .with_memory_limit(100, 1.0)
+                .build_arc()?;
             let task_ctx = TaskContext::default().with_runtime(runtime);
             let task_ctx = Arc::new(task_ctx);
 
diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs 
b/datafusion/physical-plan/src/joins/sort_merge_join.rs
index 09fe5d9ebc..2118c1a526 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs
@@ -2900,12 +2900,10 @@ mod tests {
         ];
 
         // Disable DiskManager to prevent spilling
-        let runtime = Arc::new(
-            RuntimeEnvBuilder::new()
-                .with_memory_limit(100, 1.0)
-                .with_disk_manager(DiskManagerConfig::Disabled)
-                .build()?,
-        );
+        let runtime = RuntimeEnvBuilder::new()
+            .with_memory_limit(100, 1.0)
+            .with_disk_manager(DiskManagerConfig::Disabled)
+            .build_arc()?;
         let session_config = SessionConfig::default().with_batch_size(50);
 
         for join_type in join_types {
@@ -2987,12 +2985,10 @@ mod tests {
         ];
 
         // Disable DiskManager to prevent spilling
-        let runtime = Arc::new(
-            RuntimeEnvBuilder::new()
-                .with_memory_limit(100, 1.0)
-                .with_disk_manager(DiskManagerConfig::Disabled)
-                .build()?,
-        );
+        let runtime = RuntimeEnvBuilder::new()
+            .with_memory_limit(100, 1.0)
+            .with_disk_manager(DiskManagerConfig::Disabled)
+            .build_arc()?;
         let session_config = SessionConfig::default().with_batch_size(50);
 
         for join_type in join_types {
@@ -3052,12 +3048,10 @@ mod tests {
         ];
 
         // Enable DiskManager to allow spilling
-        let runtime = Arc::new(
-            RuntimeEnvBuilder::new()
-                .with_memory_limit(100, 1.0)
-                .with_disk_manager(DiskManagerConfig::NewOs)
-                .build()?,
-        );
+        let runtime = RuntimeEnvBuilder::new()
+            .with_memory_limit(100, 1.0)
+            .with_disk_manager(DiskManagerConfig::NewOs)
+            .build_arc()?;
 
         for batch_size in [1, 50] {
             let session_config = 
SessionConfig::default().with_batch_size(batch_size);
@@ -3162,12 +3156,10 @@ mod tests {
         ];
 
         // Enable DiskManager to allow spilling
-        let runtime = Arc::new(
-            RuntimeEnvBuilder::new()
-                .with_memory_limit(500, 1.0)
-                .with_disk_manager(DiskManagerConfig::NewOs)
-                .build()?,
-        );
+        let runtime = RuntimeEnvBuilder::new()
+            .with_memory_limit(500, 1.0)
+            .with_disk_manager(DiskManagerConfig::NewOs)
+            .build_arc()?;
 
         for batch_size in [1, 50] {
             let session_config = 
SessionConfig::default().with_batch_size(batch_size);
diff --git a/datafusion/physical-plan/src/repartition/mod.rs 
b/datafusion/physical-plan/src/repartition/mod.rs
index 650006a9d0..47e5192c23 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -1506,11 +1506,9 @@ mod tests {
         let partitioning = Partitioning::RoundRobinBatch(4);
 
         // setup up context
-        let runtime = Arc::new(
-            RuntimeEnvBuilder::default()
-                .with_memory_limit(1, 1.0)
-                .build()?,
-        );
+        let runtime = RuntimeEnvBuilder::default()
+            .with_memory_limit(1, 1.0)
+            .build_arc()?;
 
         let task_ctx = TaskContext::default().with_runtime(runtime);
         let task_ctx = Arc::new(task_ctx);
diff --git a/datafusion/physical-plan/src/sorts/sort.rs 
b/datafusion/physical-plan/src/sorts/sort.rs
index e004119401..fa9628abdf 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -1148,11 +1148,9 @@ mod tests {
             .options()
             .execution
             .sort_spill_reservation_bytes;
-        let runtime = Arc::new(
-            RuntimeEnvBuilder::new()
-                .with_memory_limit(sort_spill_reservation_bytes + 12288, 1.0)
-                .build()?,
-        );
+        let runtime = RuntimeEnvBuilder::new()
+            .with_memory_limit(sort_spill_reservation_bytes + 12288, 1.0)
+            .build_arc()?;
         let task_ctx = Arc::new(
             TaskContext::default()
                 .with_session_config(session_config)
@@ -1226,14 +1224,12 @@ mod tests {
                 .execution
                 .sort_spill_reservation_bytes;
 
-            let runtime = Arc::new(
-                RuntimeEnvBuilder::new()
-                    .with_memory_limit(
-                        sort_spill_reservation_bytes + avg_batch_size * 
(partitions - 1),
-                        1.0,
-                    )
-                    .build()?,
-            );
+            let runtime = RuntimeEnvBuilder::new()
+                .with_memory_limit(
+                    sort_spill_reservation_bytes + avg_batch_size * 
(partitions - 1),
+                    1.0,
+                )
+                .build_arc()?;
             let task_ctx = Arc::new(
                 TaskContext::default()
                     .with_runtime(runtime)
diff --git a/datafusion/wasmtest/src/lib.rs b/datafusion/wasmtest/src/lib.rs
index 50325d262d..0f24449cbe 100644
--- a/datafusion/wasmtest/src/lib.rs
+++ b/datafusion/wasmtest/src/lib.rs
@@ -98,12 +98,10 @@ mod test {
         let sql = "SELECT 2 + 2;";
 
         // Execute SQL (using datafusion)
-        let rt = Arc::new(
-            RuntimeEnvBuilder::new()
-                .with_disk_manager(DiskManagerConfig::Disabled)
-                .build()
-                .unwrap(),
-        );
+        let rt = RuntimeEnvBuilder::new()
+            .with_disk_manager(DiskManagerConfig::Disabled)
+            .build_arc()
+            .unwrap();
         let session_config = SessionConfig::new().with_target_partitions(1);
         let session_context =
             Arc::new(SessionContext::new_with_config_rt(session_config, rt));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to