This is an automated email from the ASF dual-hosted git repository.
jayzhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new ac74cd3163 Minor: Add `RuntimeEnvBuilder::build_arc() (#12213)
ac74cd3163 is described below
commit ac74cd3163e43563807a8c6e8e72bb058cb6f459
Author: Andrew Lamb <[email protected]>
AuthorDate: Mon Sep 2 09:02:42 2024 -0400
Minor: Add `RuntimeEnvBuilder::build_arc() (#12213)
---
datafusion/core/src/execution/context/mod.rs | 6 ++--
datafusion/core/tests/fuzz_cases/sort_fuzz.rs | 10 +++---
datafusion/core/tests/memory_limit/mod.rs | 11 +++---
datafusion/core/tests/parquet/file_statistics.rs | 11 +++---
datafusion/execution/src/runtime_env.rs | 5 +++
datafusion/execution/src/task.rs | 4 +--
datafusion/physical-plan/src/aggregates/mod.rs | 18 ++++------
datafusion/physical-plan/src/joins/cross_join.rs | 8 ++---
datafusion/physical-plan/src/joins/hash_join.rs | 16 ++++-----
.../physical-plan/src/joins/nested_loop_join.rs | 8 ++---
.../physical-plan/src/joins/sort_merge_join.rs | 40 +++++++++-------------
datafusion/physical-plan/src/repartition/mod.rs | 8 ++---
datafusion/physical-plan/src/sorts/sort.rs | 22 +++++-------
datafusion/wasmtest/src/lib.rs | 10 +++---
14 files changed, 75 insertions(+), 102 deletions(-)
diff --git a/datafusion/core/src/execution/context/mod.rs
b/datafusion/core/src/execution/context/mod.rs
index c67424c0fa..06dc797ae2 100644
--- a/datafusion/core/src/execution/context/mod.rs
+++ b/datafusion/core/src/execution/context/mod.rs
@@ -219,13 +219,13 @@ where
/// // configure a memory limit of 1GB with 20% slop
/// let runtime_env = RuntimeEnvBuilder::new()
/// .with_memory_limit(1024 * 1024 * 1024, 0.80)
-/// .build()
+/// .build_arc()
/// .unwrap();
///
/// // Create a SessionState using the config and runtime_env
/// let state = SessionStateBuilder::new()
/// .with_config(config)
-/// .with_runtime_env(Arc::new(runtime_env))
+/// .with_runtime_env(runtime_env)
/// // include support for built in functions and configurations
/// .with_default_features()
/// .build();
@@ -1758,7 +1758,7 @@ mod tests {
let path = path.join("tests/tpch-csv");
let url = format!("file://{}", path.display());
- let runtime = Arc::new(RuntimeEnvBuilder::new().build()?);
+ let runtime = RuntimeEnvBuilder::new().build_arc()?;
let cfg = SessionConfig::new()
.set_str("datafusion.catalog.location", url.as_str())
.set_str("datafusion.catalog.format", "CSV")
diff --git a/datafusion/core/tests/fuzz_cases/sort_fuzz.rs
b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs
index 1980589491..fae4731569 100644
--- a/datafusion/core/tests/fuzz_cases/sort_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/sort_fuzz.rs
@@ -136,12 +136,10 @@ impl SortTest {
.sort_spill_reservation_bytes,
);
- let runtime = Arc::new(
- RuntimeEnvBuilder::new()
-
.with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size)))
- .build()
- .unwrap(),
- );
+ let runtime = RuntimeEnvBuilder::new()
+ .with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size)))
+ .build_arc()
+ .unwrap();
SessionContext::new_with_config_rt(session_config, runtime)
} else {
SessionContext::new_with_config(session_config)
diff --git a/datafusion/core/tests/memory_limit/mod.rs
b/datafusion/core/tests/memory_limit/mod.rs
index 592c25dedc..69ef6058a2 100644
--- a/datafusion/core/tests/memory_limit/mod.rs
+++ b/datafusion/core/tests/memory_limit/mod.rs
@@ -509,21 +509,20 @@ impl TestCase {
let table = scenario.table();
- let rt_config = RuntimeEnvBuilder::new()
+ let mut builder = RuntimeEnvBuilder::new()
// disk manager setting controls the spilling
.with_disk_manager(disk_manager_config)
.with_memory_limit(memory_limit, MEMORY_FRACTION);
- let runtime = if let Some(pool) = memory_pool {
- rt_config.with_memory_pool(pool).build().unwrap()
- } else {
- rt_config.build().unwrap()
+ if let Some(pool) = memory_pool {
+ builder = builder.with_memory_pool(pool);
};
+ let runtime = builder.build_arc().unwrap();
// Configure execution
let builder = SessionStateBuilder::new()
.with_config(config)
- .with_runtime_env(Arc::new(runtime))
+ .with_runtime_env(runtime)
.with_default_features();
let builder = match scenario.rules() {
Some(rules) => builder.with_physical_optimizer_rules(rules),
diff --git a/datafusion/core/tests/parquet/file_statistics.rs
b/datafusion/core/tests/parquet/file_statistics.rs
index bd251f1a66..cd62c3bf42 100644
--- a/datafusion/core/tests/parquet/file_statistics.rs
+++ b/datafusion/core/tests/parquet/file_statistics.rs
@@ -197,12 +197,11 @@ fn get_cache_runtime_state() -> (
.with_files_statistics_cache(Some(file_static_cache.clone()))
.with_list_files_cache(Some(list_file_cache.clone()));
- let rt = Arc::new(
- RuntimeEnvBuilder::new()
- .with_cache_manager(cache_config)
- .build()
- .expect("could not build runtime environment"),
- );
+ let rt = RuntimeEnvBuilder::new()
+ .with_cache_manager(cache_config)
+ .build_arc()
+ .expect("could not build runtime environment");
+
let state = SessionContext::new_with_config_rt(SessionConfig::default(),
rt).state();
(file_static_cache, list_file_cache, state)
diff --git a/datafusion/execution/src/runtime_env.rs
b/datafusion/execution/src/runtime_env.rs
index e7b48be95c..574d387ae6 100644
--- a/datafusion/execution/src/runtime_env.rs
+++ b/datafusion/execution/src/runtime_env.rs
@@ -246,4 +246,9 @@ impl RuntimeEnvBuilder {
object_store_registry: self.object_store_registry,
})
}
+
+ /// Convenience method to create a new `Arc<RuntimeEnv>`
+ pub fn build_arc(self) -> Result<Arc<RuntimeEnv>> {
+ self.build().map(Arc::new)
+ }
}
diff --git a/datafusion/execution/src/task.rs b/datafusion/execution/src/task.rs
index 35689b8e08..57fcac0ee5 100644
--- a/datafusion/execution/src/task.rs
+++ b/datafusion/execution/src/task.rs
@@ -58,7 +58,7 @@ pub struct TaskContext {
impl Default for TaskContext {
fn default() -> Self {
let runtime = RuntimeEnvBuilder::new()
- .build()
+ .build_arc()
.expect("default runtime created successfully");
// Create a default task context, mostly useful for testing
@@ -69,7 +69,7 @@ impl Default for TaskContext {
scalar_functions: HashMap::new(),
aggregate_functions: HashMap::new(),
window_functions: HashMap::new(),
- runtime: Arc::new(runtime),
+ runtime,
}
}
}
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs
b/datafusion/physical-plan/src/aggregates/mod.rs
index e66a57fd2e..764227e5e7 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -1331,12 +1331,10 @@ mod tests {
fn new_spill_ctx(batch_size: usize, max_memory: usize) -> Arc<TaskContext>
{
let session_config = SessionConfig::new().with_batch_size(batch_size);
- let runtime = Arc::new(
- RuntimeEnvBuilder::default()
- .with_memory_pool(Arc::new(FairSpillPool::new(max_memory)))
- .build()
- .unwrap(),
- );
+ let runtime = RuntimeEnvBuilder::default()
+ .with_memory_pool(Arc::new(FairSpillPool::new(max_memory)))
+ .build_arc()
+ .unwrap();
let task_ctx = TaskContext::default()
.with_session_config(session_config)
.with_runtime(runtime);
@@ -1815,11 +1813,9 @@ mod tests {
let input: Arc<dyn ExecutionPlan> =
Arc::new(TestYieldingExec::new(true));
let input_schema = input.schema();
- let runtime = Arc::new(
- RuntimeEnvBuilder::default()
- .with_memory_limit(1, 1.0)
- .build()?,
- );
+ let runtime = RuntimeEnvBuilder::default()
+ .with_memory_limit(1, 1.0)
+ .build_arc()?;
let task_ctx = TaskContext::default().with_runtime(runtime);
let task_ctx = Arc::new(task_ctx);
diff --git a/datafusion/physical-plan/src/joins/cross_join.rs
b/datafusion/physical-plan/src/joins/cross_join.rs
index b99d4f17c4..11153556f2 100644
--- a/datafusion/physical-plan/src/joins/cross_join.rs
+++ b/datafusion/physical-plan/src/joins/cross_join.rs
@@ -673,11 +673,9 @@ mod tests {
#[tokio::test]
async fn test_overallocation() -> Result<()> {
- let runtime = Arc::new(
- RuntimeEnvBuilder::new()
- .with_memory_limit(100, 1.0)
- .build()?,
- );
+ let runtime = RuntimeEnvBuilder::new()
+ .with_memory_limit(100, 1.0)
+ .build_arc()?;
let task_ctx = TaskContext::default().with_runtime(runtime);
let task_ctx = Arc::new(task_ctx);
diff --git a/datafusion/physical-plan/src/joins/hash_join.rs
b/datafusion/physical-plan/src/joins/hash_join.rs
index f20d00e1a2..38827108e8 100644
--- a/datafusion/physical-plan/src/joins/hash_join.rs
+++ b/datafusion/physical-plan/src/joins/hash_join.rs
@@ -3798,11 +3798,9 @@ mod tests {
];
for join_type in join_types {
- let runtime = Arc::new(
- RuntimeEnvBuilder::new()
- .with_memory_limit(100, 1.0)
- .build()?,
- );
+ let runtime = RuntimeEnvBuilder::new()
+ .with_memory_limit(100, 1.0)
+ .build_arc()?;
let task_ctx = TaskContext::default().with_runtime(runtime);
let task_ctx = Arc::new(task_ctx);
@@ -3874,11 +3872,9 @@ mod tests {
];
for join_type in join_types {
- let runtime = Arc::new(
- RuntimeEnvBuilder::new()
- .with_memory_limit(100, 1.0)
- .build()?,
- );
+ let runtime = RuntimeEnvBuilder::new()
+ .with_memory_limit(100, 1.0)
+ .build_arc()?;
let session_config = SessionConfig::default().with_batch_size(50);
let task_ctx = TaskContext::default()
.with_session_config(session_config)
diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs
b/datafusion/physical-plan/src/joins/nested_loop_join.rs
index 3cd3735441..dadd20714e 100644
--- a/datafusion/physical-plan/src/joins/nested_loop_join.rs
+++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs
@@ -1019,11 +1019,9 @@ mod tests {
];
for join_type in join_types {
- let runtime = Arc::new(
- RuntimeEnvBuilder::new()
- .with_memory_limit(100, 1.0)
- .build()?,
- );
+ let runtime = RuntimeEnvBuilder::new()
+ .with_memory_limit(100, 1.0)
+ .build_arc()?;
let task_ctx = TaskContext::default().with_runtime(runtime);
let task_ctx = Arc::new(task_ctx);
diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs
b/datafusion/physical-plan/src/joins/sort_merge_join.rs
index 09fe5d9ebc..2118c1a526 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs
@@ -2900,12 +2900,10 @@ mod tests {
];
// Disable DiskManager to prevent spilling
- let runtime = Arc::new(
- RuntimeEnvBuilder::new()
- .with_memory_limit(100, 1.0)
- .with_disk_manager(DiskManagerConfig::Disabled)
- .build()?,
- );
+ let runtime = RuntimeEnvBuilder::new()
+ .with_memory_limit(100, 1.0)
+ .with_disk_manager(DiskManagerConfig::Disabled)
+ .build_arc()?;
let session_config = SessionConfig::default().with_batch_size(50);
for join_type in join_types {
@@ -2987,12 +2985,10 @@ mod tests {
];
// Disable DiskManager to prevent spilling
- let runtime = Arc::new(
- RuntimeEnvBuilder::new()
- .with_memory_limit(100, 1.0)
- .with_disk_manager(DiskManagerConfig::Disabled)
- .build()?,
- );
+ let runtime = RuntimeEnvBuilder::new()
+ .with_memory_limit(100, 1.0)
+ .with_disk_manager(DiskManagerConfig::Disabled)
+ .build_arc()?;
let session_config = SessionConfig::default().with_batch_size(50);
for join_type in join_types {
@@ -3052,12 +3048,10 @@ mod tests {
];
// Enable DiskManager to allow spilling
- let runtime = Arc::new(
- RuntimeEnvBuilder::new()
- .with_memory_limit(100, 1.0)
- .with_disk_manager(DiskManagerConfig::NewOs)
- .build()?,
- );
+ let runtime = RuntimeEnvBuilder::new()
+ .with_memory_limit(100, 1.0)
+ .with_disk_manager(DiskManagerConfig::NewOs)
+ .build_arc()?;
for batch_size in [1, 50] {
let session_config =
SessionConfig::default().with_batch_size(batch_size);
@@ -3162,12 +3156,10 @@ mod tests {
];
// Enable DiskManager to allow spilling
- let runtime = Arc::new(
- RuntimeEnvBuilder::new()
- .with_memory_limit(500, 1.0)
- .with_disk_manager(DiskManagerConfig::NewOs)
- .build()?,
- );
+ let runtime = RuntimeEnvBuilder::new()
+ .with_memory_limit(500, 1.0)
+ .with_disk_manager(DiskManagerConfig::NewOs)
+ .build_arc()?;
for batch_size in [1, 50] {
let session_config =
SessionConfig::default().with_batch_size(batch_size);
diff --git a/datafusion/physical-plan/src/repartition/mod.rs
b/datafusion/physical-plan/src/repartition/mod.rs
index 650006a9d0..47e5192c23 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -1506,11 +1506,9 @@ mod tests {
let partitioning = Partitioning::RoundRobinBatch(4);
// setup up context
- let runtime = Arc::new(
- RuntimeEnvBuilder::default()
- .with_memory_limit(1, 1.0)
- .build()?,
- );
+ let runtime = RuntimeEnvBuilder::default()
+ .with_memory_limit(1, 1.0)
+ .build_arc()?;
let task_ctx = TaskContext::default().with_runtime(runtime);
let task_ctx = Arc::new(task_ctx);
diff --git a/datafusion/physical-plan/src/sorts/sort.rs
b/datafusion/physical-plan/src/sorts/sort.rs
index e004119401..fa9628abdf 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -1148,11 +1148,9 @@ mod tests {
.options()
.execution
.sort_spill_reservation_bytes;
- let runtime = Arc::new(
- RuntimeEnvBuilder::new()
- .with_memory_limit(sort_spill_reservation_bytes + 12288, 1.0)
- .build()?,
- );
+ let runtime = RuntimeEnvBuilder::new()
+ .with_memory_limit(sort_spill_reservation_bytes + 12288, 1.0)
+ .build_arc()?;
let task_ctx = Arc::new(
TaskContext::default()
.with_session_config(session_config)
@@ -1226,14 +1224,12 @@ mod tests {
.execution
.sort_spill_reservation_bytes;
- let runtime = Arc::new(
- RuntimeEnvBuilder::new()
- .with_memory_limit(
- sort_spill_reservation_bytes + avg_batch_size *
(partitions - 1),
- 1.0,
- )
- .build()?,
- );
+ let runtime = RuntimeEnvBuilder::new()
+ .with_memory_limit(
+ sort_spill_reservation_bytes + avg_batch_size *
(partitions - 1),
+ 1.0,
+ )
+ .build_arc()?;
let task_ctx = Arc::new(
TaskContext::default()
.with_runtime(runtime)
diff --git a/datafusion/wasmtest/src/lib.rs b/datafusion/wasmtest/src/lib.rs
index 50325d262d..0f24449cbe 100644
--- a/datafusion/wasmtest/src/lib.rs
+++ b/datafusion/wasmtest/src/lib.rs
@@ -98,12 +98,10 @@ mod test {
let sql = "SELECT 2 + 2;";
// Execute SQL (using datafusion)
- let rt = Arc::new(
- RuntimeEnvBuilder::new()
- .with_disk_manager(DiskManagerConfig::Disabled)
- .build()
- .unwrap(),
- );
+ let rt = RuntimeEnvBuilder::new()
+ .with_disk_manager(DiskManagerConfig::Disabled)
+ .build_arc()
+ .unwrap();
let session_config = SessionConfig::new().with_target_partitions(1);
let session_context =
Arc::new(SessionContext::new_with_config_rt(session_config, rt));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]