This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 7153fac  Improve configuration and resource use of `MemoryManager` and 
`DiskManager` (#1668)
7153fac is described below

commit 7153fac7d8ff77c71a67cd2ba3ec55d94a4cd794
Author: Andrew Lamb <[email protected]>
AuthorDate: Tue Jan 25 16:34:24 2022 -0500

    Improve configuration and resource use of `MemoryManager` and `DiskManager` 
(#1668)
    
    * Improve configuration and resource use of `MemoryManager` and 
`DiskManager`
    
    * fmt
---
 ballista/rust/executor/src/executor.rs     |   8 +-
 datafusion/src/execution/context.rs        |  94 +++++++++++++++-
 datafusion/src/execution/disk_manager.rs   | 106 +++++++++++++-----
 datafusion/src/execution/memory_manager.rs | 165 ++++++++++++++++++++++++-----
 datafusion/src/execution/mod.rs            |   3 +
 datafusion/src/execution/runtime_env.rs    |  81 ++++++--------
 datafusion/src/physical_plan/sorts/sort.rs |  10 +-
 7 files changed, 355 insertions(+), 112 deletions(-)

diff --git a/ballista/rust/executor/src/executor.rs 
b/ballista/rust/executor/src/executor.rs
index e7479bd..6bf1aeb 100644
--- a/ballista/rust/executor/src/executor.rs
+++ b/ballista/rust/executor/src/executor.rs
@@ -24,9 +24,10 @@ use ballista_core::execution_plans::ShuffleWriterExec;
 use ballista_core::serde::protobuf;
 use ballista_core::serde::scheduler::ExecutorSpecification;
 use datafusion::error::DataFusionError;
-use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
+use datafusion::execution::runtime_env::RuntimeEnv;
 use datafusion::physical_plan::display::DisplayableExecutionPlan;
 use datafusion::physical_plan::{ExecutionPlan, Partitioning};
+use datafusion::prelude::ExecutionConfig;
 
 /// Ballista executor
 pub struct Executor {
@@ -87,9 +88,8 @@ impl Executor {
             ))
         }?;
 
-        let runtime_config =
-            RuntimeConfig::new().with_local_dirs(vec![self.work_dir.clone()]);
-        let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
+        let config = 
ExecutionConfig::new().with_temp_file_path(self.work_dir.clone());
+        let runtime = Arc::new(RuntimeEnv::new(config.runtime)?);
 
         let partitions = exec.execute_shuffle_write(part, runtime).await?;
 
diff --git a/datafusion/src/execution/context.rs 
b/datafusion/src/execution/context.rs
index ceea83d..61cbf3a 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -39,7 +39,6 @@ use crate::{
     },
 };
 use log::debug;
-use std::fs;
 use std::path::Path;
 use std::string::String;
 use std::sync::Arc;
@@ -47,6 +46,7 @@ use std::{
     collections::{HashMap, HashSet},
     sync::Mutex,
 };
+use std::{fs, path::PathBuf};
 
 use futures::{StreamExt, TryStreamExt};
 use tokio::task::{self, JoinHandle};
@@ -94,7 +94,12 @@ use chrono::{DateTime, Utc};
 use parquet::arrow::ArrowWriter;
 use parquet::file::properties::WriterProperties;
 
-use super::options::{AvroReadOptions, CsvReadOptions};
+use super::{
+    disk_manager::DiskManagerConfig,
+    memory_manager::MemoryManagerConfig,
+    options::{AvroReadOptions, CsvReadOptions},
+    DiskManager, MemoryManager,
+};
 
 /// ExecutionContext is the main interface for executing queries with 
DataFusion. The context
 /// provides the following functionality:
@@ -195,6 +200,11 @@ impl ExecutionContext {
         }
     }
 
+    /// Return the [RuntimeEnv] used to run queries with this 
[ExecutionContext]
+    pub fn runtime_env(&self) -> Arc<RuntimeEnv> {
+        self.state.lock().unwrap().runtime_env.clone()
+    }
+
     /// Creates a dataframe that will execute a SQL query.
     ///
     /// This method is `async` because queries of type `CREATE EXTERNAL TABLE`
@@ -718,7 +728,7 @@ impl ExecutionContext {
         let path = path.as_ref();
         // create directory to contain the CSV files (one per partition)
         let fs_path = Path::new(path);
-        let runtime = self.state.lock().unwrap().runtime_env.clone();
+        let runtime = self.runtime_env();
         match fs::create_dir(fs_path) {
             Ok(()) => {
                 let mut tasks = vec![];
@@ -758,7 +768,7 @@ impl ExecutionContext {
         let path = path.as_ref();
         // create directory to contain the Parquet files (one per partition)
         let fs_path = Path::new(path);
-        let runtime = self.state.lock().unwrap().runtime_env.clone();
+        let runtime = self.runtime_env();
         match fs::create_dir(fs_path) {
             Ok(()) => {
                 let mut tasks = vec![];
@@ -1057,6 +1067,48 @@ impl ExecutionConfig {
         self.runtime = config;
         self
     }
+
+    /// Use an an existing [MemoryManager]
+    pub fn with_existing_memory_manager(mut self, existing: 
Arc<MemoryManager>) -> Self {
+        self.runtime = self
+            .runtime
+            .with_memory_manager(MemoryManagerConfig::new_existing(existing));
+        self
+    }
+
+    /// Specify the total memory to use while running the DataFusion
+    /// plan to `max_memory * memory_fraction` in bytes.
+    ///
+    /// Note DataFusion does not yet respect this limit in all cases.
+    pub fn with_memory_limit(
+        mut self,
+        max_memory: usize,
+        memory_fraction: f64,
+    ) -> Result<Self> {
+        self.runtime =
+            self.runtime
+                .with_memory_manager(MemoryManagerConfig::try_new_limit(
+                    max_memory,
+                    memory_fraction,
+                )?);
+        Ok(self)
+    }
+
+    /// Use an an existing [DiskManager]
+    pub fn with_existing_disk_manager(mut self, existing: Arc<DiskManager>) -> 
Self {
+        self.runtime = self
+            .runtime
+            .with_disk_manager(DiskManagerConfig::new_existing(existing));
+        self
+    }
+
+    /// Use the specified path to create any needed temporary files
+    pub fn with_temp_file_path(mut self, path: impl Into<PathBuf>) -> Self {
+        self.runtime = self
+            .runtime
+            
.with_disk_manager(DiskManagerConfig::new_specified(vec![path.into()]));
+        self
+    }
 }
 
 /// Holds per-execution properties and data (such as starting timestamps, etc).
@@ -1246,6 +1298,40 @@ mod tests {
     use tempfile::TempDir;
     use test::*;
 
+    #[tokio::test]
+    async fn shared_memory_and_disk_manager() {
+        // Demonstrate the ability to share DiskManager and
+        // MemoryManager between two different executions.
+        let ctx1 = ExecutionContext::new();
+
+        // configure with same memory / disk manager
+        let memory_manager = ctx1.runtime_env().memory_manager.clone();
+        let disk_manager = ctx1.runtime_env().disk_manager.clone();
+        let config = ExecutionConfig::new()
+            .with_existing_memory_manager(memory_manager.clone())
+            .with_existing_disk_manager(disk_manager.clone());
+
+        let ctx2 = ExecutionContext::with_config(config);
+
+        assert!(std::ptr::eq(
+            Arc::as_ptr(&memory_manager),
+            Arc::as_ptr(&ctx1.runtime_env().memory_manager)
+        ));
+        assert!(std::ptr::eq(
+            Arc::as_ptr(&memory_manager),
+            Arc::as_ptr(&ctx2.runtime_env().memory_manager)
+        ));
+
+        assert!(std::ptr::eq(
+            Arc::as_ptr(&disk_manager),
+            Arc::as_ptr(&ctx1.runtime_env().disk_manager)
+        ));
+        assert!(std::ptr::eq(
+            Arc::as_ptr(&disk_manager),
+            Arc::as_ptr(&ctx2.runtime_env().disk_manager)
+        ));
+    }
+
     #[test]
     fn optimize_explain() {
         let schema = Schema::new(vec![Field::new("id", DataType::Int32, 
false)]);
diff --git a/datafusion/src/execution/disk_manager.rs 
b/datafusion/src/execution/disk_manager.rs
index c4a6b1d..c98df3b 100644
--- a/datafusion/src/execution/disk_manager.rs
+++ b/datafusion/src/execution/disk_manager.rs
@@ -19,30 +19,86 @@
 //! hashed among the directories listed in RuntimeConfig::local_dirs.
 
 use crate::error::{DataFusionError, Result};
-use log::info;
+use log::{debug, info};
 use rand::distributions::Alphanumeric;
 use rand::{thread_rng, Rng};
 use std::collections::hash_map::DefaultHasher;
 use std::fs::File;
 use std::hash::{Hash, Hasher};
 use std::path::{Path, PathBuf};
+use std::sync::Arc;
 use tempfile::{Builder, TempDir};
 
+/// Configuration for temporary disk access
+#[derive(Debug, Clone)]
+pub enum DiskManagerConfig {
+    /// Use the provided [DiskManager] instance
+    Existing(Arc<DiskManager>),
+
+    /// Create a new [DiskManager] that creates temporary files within
+    /// a temporary directory chosen by the OS
+    NewOs,
+
+    /// Create a new [DiskManager] that creates temporary files within
+    /// the specified directories
+    NewSpecified(Vec<PathBuf>),
+}
+
+impl Default for DiskManagerConfig {
+    fn default() -> Self {
+        Self::NewOs
+    }
+}
+
+impl DiskManagerConfig {
+    /// Create temporary files in a temporary directory chosen by the OS
+    pub fn new() -> Self {
+        Self::default()
+    }
+
+    /// Create temporary files using the provided disk manager
+    pub fn new_existing(existing: Arc<DiskManager>) -> Self {
+        Self::Existing(existing)
+    }
+
+    /// Create temporary files in the specified directories
+    pub fn new_specified(paths: Vec<PathBuf>) -> Self {
+        Self::NewSpecified(paths)
+    }
+}
+
 /// Manages files generated during query execution, e.g. spill files generated
 /// while processing dataset larger than available memory.
+#[derive(Debug)]
 pub struct DiskManager {
     local_dirs: Vec<TempDir>,
 }
 
 impl DiskManager {
-    /// Create local dirs inside user provided dirs through conf
-    pub fn new(conf_dirs: &[String]) -> Result<Self> {
-        let local_dirs = create_local_dirs(conf_dirs)?;
-        info!(
-            "Created local dirs {:?} as DataFusion working directory",
-            local_dirs
-        );
-        Ok(Self { local_dirs })
+    /// Create a DiskManager given the configuration
+    pub fn try_new(config: DiskManagerConfig) -> Result<Arc<Self>> {
+        match config {
+            DiskManagerConfig::Existing(manager) => Ok(manager),
+            DiskManagerConfig::NewOs => {
+                let tempdir = 
tempfile::tempdir().map_err(DataFusionError::IoError)?;
+
+                debug!(
+                    "Created directory {:?} as DataFusion working directory",
+                    tempdir
+                );
+                Ok(Arc::new(Self {
+                    local_dirs: vec![tempdir],
+                }))
+            }
+            DiskManagerConfig::NewSpecified(conf_dirs) => {
+                let local_dirs = create_local_dirs(conf_dirs)?;
+                info!(
+                    "Created local dirs {:?} as DataFusion working directory",
+                    local_dirs
+                );
+                Ok(Arc::new(Self { local_dirs }))
+            }
+        }
     }
 
     /// Create a file in conf dirs in randomized manner and return the file 
path
@@ -52,20 +108,18 @@ impl DiskManager {
 }
 
 /// Setup local dirs by creating one new dir in each of the given dirs
-fn create_local_dirs(local_dir: &[String]) -> Result<Vec<TempDir>> {
-    local_dir
+fn create_local_dirs(local_dirs: Vec<PathBuf>) -> Result<Vec<TempDir>> {
+    local_dirs
         .iter()
-        .map(|root| create_dir(root, "datafusion-"))
+        .map(|root| {
+            Builder::new()
+                .prefix("datafusion-")
+                .tempdir_in(root)
+                .map_err(DataFusionError::IoError)
+        })
         .collect()
 }
 
-fn create_dir(root: &str, prefix: &str) -> Result<TempDir> {
-    Builder::new()
-        .prefix(prefix)
-        .tempdir_in(root)
-        .map_err(DataFusionError::IoError)
-}
-
 fn get_file(file_name: &str, local_dirs: &[TempDir]) -> String {
     let mut hasher = DefaultHasher::new();
     file_name.hash(&mut hasher);
@@ -98,8 +152,8 @@ fn rand_name() -> String {
 
 #[cfg(test)]
 mod tests {
+    use super::*;
     use crate::error::Result;
-    use crate::execution::disk_manager::{get_file, DiskManager};
     use tempfile::TempDir;
 
     #[test]
@@ -107,13 +161,13 @@ mod tests {
         let local_dir1 = TempDir::new()?;
         let local_dir2 = TempDir::new()?;
         let local_dir3 = TempDir::new()?;
-        let local_dirs = vec![
-            local_dir1.path().to_str().unwrap().to_string(),
-            local_dir2.path().to_str().unwrap().to_string(),
-            local_dir3.path().to_str().unwrap().to_string(),
-        ];
+        let config = DiskManagerConfig::new_specified(vec![
+            local_dir1.path().into(),
+            local_dir2.path().into(),
+            local_dir3.path().into(),
+        ]);
 
-        let dm = DiskManager::new(&local_dirs)?;
+        let dm = DiskManager::try_new(config)?;
         let actual = dm.create_tmp_file()?;
         let name = actual.rsplit_once(std::path::MAIN_SEPARATOR).unwrap().1;
 
diff --git a/datafusion/src/execution/memory_manager.rs 
b/datafusion/src/execution/memory_manager.rs
index caa597b..32f7975 100644
--- a/datafusion/src/execution/memory_manager.rs
+++ b/datafusion/src/execution/memory_manager.rs
@@ -17,7 +17,7 @@
 
 //! Manages all available memory during query execution
 
-use crate::error::Result;
+use crate::error::{DataFusionError, Result};
 use async_trait::async_trait;
 use hashbrown::HashMap;
 use log::info;
@@ -28,6 +28,84 @@ use std::sync::{Arc, Condvar, Mutex, Weak};
 
 static CONSUMER_ID: AtomicUsize = AtomicUsize::new(0);
 
+#[derive(Debug, Clone)]
+/// Configuration information for memory management
+pub enum MemoryManagerConfig {
+    /// Use the existing [MemoryManager]
+    Existing(Arc<MemoryManager>),
+
+    /// Create a new [MemoryManager] that will use up to some
+    /// fraction of total system memory.
+    New {
+        /// Max execution memory allowed for DataFusion.  Defaults to
+        /// `usize::MAX`, which will not attempt to limit the memory
+        /// used during plan execution.
+        max_memory: usize,
+
+        /// The fraction of `max_memory` that the memory manager will
+        /// use for execution.
+        ///
+        /// The purpose of this config is to set aside memory for
+        /// untracked data structures, and imprecise size estimation
+        /// during memory acquisition.  Defaults to 0.7
+        memory_fraction: f64,
+    },
+}
+
+impl Default for MemoryManagerConfig {
+    fn default() -> Self {
+        Self::New {
+            max_memory: usize::MAX,
+            memory_fraction: 0.7,
+        }
+    }
+}
+
+impl MemoryManagerConfig {
+    /// Create a new memory [MemoryManager] with no limit on the
+    /// memory used
+    pub fn new() -> Self {
+        Default::default()
+    }
+
+    /// Create a configuration based on an existing [MemoryManager]
+    pub fn new_existing(existing: Arc<MemoryManager>) -> Self {
+        Self::Existing(existing)
+    }
+
+    /// Create a new [MemoryManager] with a `max_memory` and `fraction`
+    pub fn try_new_limit(max_memory: usize, memory_fraction: f64) -> 
Result<Self> {
+        if max_memory == 0 {
+            return Err(DataFusionError::Plan(format!(
+                "invalid max_memory. Expected greater than 0, got {}",
+                max_memory
+            )));
+        }
+        if !(memory_fraction > 0f64 && memory_fraction <= 1f64) {
+            return Err(DataFusionError::Plan(format!(
+                "invalid fraction. Expected greater than 0 and less than 1.0, 
got {}",
+                memory_fraction
+            )));
+        }
+
+        Ok(Self::New {
+            max_memory,
+            memory_fraction,
+        })
+    }
+
+    /// return the maximum size of the memory, in bytes, this config will allow
+    fn pool_size(&self) -> usize {
+        match self {
+            MemoryManagerConfig::Existing(existing) => existing.pool_size,
+            MemoryManagerConfig::New {
+                max_memory,
+                memory_fraction,
+            } => (*max_memory as f64 * *memory_fraction) as usize,
+        }
+    }
+}
+
 fn next_id() -> usize {
     CONSUMER_ID.fetch_add(1, Ordering::SeqCst)
 }
@@ -165,6 +243,7 @@ The memory management architecture is the following:
 */
 
 /// Manage memory usage during physical plan execution
+#[derive(Debug)]
 pub struct MemoryManager {
     requesters: Arc<Mutex<HashMap<MemoryConsumerId, Weak<dyn 
MemoryConsumer>>>>,
     trackers: Arc<Mutex<HashMap<MemoryConsumerId, Weak<dyn MemoryConsumer>>>>,
@@ -174,19 +253,27 @@ pub struct MemoryManager {
 }
 
 impl MemoryManager {
-    /// Create new memory manager based on max available pool_size
+    /// Create new memory manager based on the configuration
     #[allow(clippy::mutex_atomic)]
-    pub fn new(pool_size: usize) -> Self {
-        info!(
-            "Creating memory manager with initial size {}",
-            human_readable_size(pool_size)
-        );
-        Self {
-            requesters: Arc::new(Mutex::new(HashMap::new())),
-            trackers: Arc::new(Mutex::new(HashMap::new())),
-            pool_size,
-            requesters_total: Arc::new(Mutex::new(0)),
-            cv: Condvar::new(),
+    pub fn new(config: MemoryManagerConfig) -> Arc<Self> {
+        let pool_size = config.pool_size();
+
+        match config {
+            MemoryManagerConfig::Existing(manager) => manager,
+            MemoryManagerConfig::New { .. } => {
+                info!(
+                    "Creating memory manager with initial size {}",
+                    human_readable_size(pool_size)
+                );
+
+                Arc::new(Self {
+                    requesters: Arc::new(Mutex::new(HashMap::new())),
+                    trackers: Arc::new(Mutex::new(HashMap::new())),
+                    pool_size,
+                    requesters_total: Arc::new(Mutex::new(0)),
+                    cv: Condvar::new(),
+                })
+            }
         }
     }
 
@@ -328,10 +415,8 @@ fn human_readable_size(size: usize) -> String {
 
 #[cfg(test)]
 mod tests {
+    use super::*;
     use crate::error::Result;
-    use crate::execution::memory_manager::{
-        ConsumerType, MemoryConsumer, MemoryConsumerId, MemoryManager,
-    };
     use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
     use async_trait::async_trait;
     use std::sync::atomic::{AtomicUsize, Ordering};
@@ -438,11 +523,10 @@ mod tests {
     }
 
     #[tokio::test]
-    async fn basic_functionalities() -> Result<()> {
+    async fn basic_functionalities() {
         let config = RuntimeConfig::new()
-            .with_memory_fraction(1.0)
-            .with_max_execution_memory(100);
-        let runtime = Arc::new(RuntimeEnv::new(config)?);
+            .with_memory_manager(MemoryManagerConfig::try_new_limit(100, 
1.0).unwrap());
+        let runtime = Arc::new(RuntimeEnv::new(config).unwrap());
 
         let tracker1 = Arc::new(DummyTracker::new(0, runtime.clone(), 5));
         runtime.register_consumer(&(tracker1.clone() as Arc<dyn 
MemoryConsumer>));
@@ -463,8 +547,8 @@ mod tests {
         runtime.register_consumer(&(requester1.clone() as Arc<dyn 
MemoryConsumer>));
 
         // first requester entered, should be able to use any of the remaining 
80
-        requester1.do_with_mem(40).await?;
-        requester1.do_with_mem(10).await?;
+        requester1.do_with_mem(40).await.unwrap();
+        requester1.do_with_mem(10).await.unwrap();
         assert_eq!(requester1.get_spills(), 0);
         assert_eq!(requester1.mem_used(), 50);
         assert_eq!(*runtime.memory_manager.requesters_total.lock().unwrap(), 
50);
@@ -472,17 +556,46 @@ mod tests {
         let requester2 = Arc::new(DummyRequester::new(0, runtime.clone()));
         runtime.register_consumer(&(requester2.clone() as Arc<dyn 
MemoryConsumer>));
 
-        requester2.do_with_mem(20).await?;
-        requester2.do_with_mem(30).await?;
+        requester2.do_with_mem(20).await.unwrap();
+        requester2.do_with_mem(30).await.unwrap();
         assert_eq!(requester2.get_spills(), 1);
         assert_eq!(requester2.mem_used(), 30);
 
-        requester1.do_with_mem(10).await?;
+        requester1.do_with_mem(10).await.unwrap();
         assert_eq!(requester1.get_spills(), 1);
         assert_eq!(requester1.mem_used(), 10);
 
         assert_eq!(*runtime.memory_manager.requesters_total.lock().unwrap(), 
40);
+    }
 
-        Ok(())
+    #[tokio::test]
+    #[should_panic(expected = "invalid max_memory. Expected greater than 0, 
got 0")]
+    async fn test_try_new_with_limit_0() {
+        MemoryManagerConfig::try_new_limit(0, 1.0).unwrap();
+    }
+
+    #[tokio::test]
+    #[should_panic(
+        expected = "invalid fraction. Expected greater than 0 and less than 
1.0, got -9.6"
+    )]
+    async fn test_try_new_with_limit_neg_fraction() {
+        MemoryManagerConfig::try_new_limit(100, -9.6).unwrap();
+    }
+
+    #[tokio::test]
+    #[should_panic(
+        expected = "invalid fraction. Expected greater than 0 and less than 
1.0, got 9.6"
+    )]
+    async fn test_try_new_with_limit_too_large() {
+        MemoryManagerConfig::try_new_limit(100, 9.6).unwrap();
+    }
+
+    #[tokio::test]
+    async fn test_try_new_with_limit_pool_size() {
+        let config = MemoryManagerConfig::try_new_limit(100, 0.5).unwrap();
+        assert_eq!(config.pool_size(), 50);
+
+        let config = MemoryManagerConfig::try_new_limit(100000, 0.1).unwrap();
+        assert_eq!(config.pool_size(), 10000);
     }
 }
diff --git a/datafusion/src/execution/mod.rs b/datafusion/src/execution/mod.rs
index 0c92627..e3b42ae 100644
--- a/datafusion/src/execution/mod.rs
+++ b/datafusion/src/execution/mod.rs
@@ -23,3 +23,6 @@ pub(crate) mod disk_manager;
 pub mod memory_manager;
 pub mod options;
 pub mod runtime_env;
+
+pub use disk_manager::DiskManager;
+pub use memory_manager::{MemoryConsumer, MemoryConsumerId, MemoryManager};
diff --git a/datafusion/src/execution/runtime_env.rs 
b/datafusion/src/execution/runtime_env.rs
index 1e1aecd..cdcd1f7 100644
--- a/datafusion/src/execution/runtime_env.rs
+++ b/datafusion/src/execution/runtime_env.rs
@@ -18,17 +18,25 @@
 //! Execution runtime environment that tracks memory, disk and various 
configurations
 //! that are used during physical plan execution.
 
-use crate::error::Result;
-use crate::execution::disk_manager::DiskManager;
-use crate::execution::memory_manager::{MemoryConsumer, MemoryConsumerId, 
MemoryManager};
+use crate::{
+    error::Result,
+    execution::{
+        disk_manager::{DiskManager, DiskManagerConfig},
+        memory_manager::{
+            MemoryConsumer, MemoryConsumerId, MemoryManager, 
MemoryManagerConfig,
+        },
+    },
+};
+
 use std::fmt::{Debug, Formatter};
 use std::sync::Arc;
 
 #[derive(Clone)]
-/// Execution runtime environment
+/// Execution runtime environment. This structure is passed to the
+/// physical plans when they are run.
 pub struct RuntimeEnv {
-    /// Runtime configuration
-    pub config: RuntimeConfig,
+    /// Default batch size while creating new batches
+    pub batch_size: usize,
     /// Runtime memory management
     pub memory_manager: Arc<MemoryManager>,
     /// Manage temporary files during query execution
@@ -44,20 +52,22 @@ impl Debug for RuntimeEnv {
 impl RuntimeEnv {
     /// Create env based on configuration
     pub fn new(config: RuntimeConfig) -> Result<Self> {
-        let memory_manager = Arc::new(MemoryManager::new(
-            (config.max_memory as f64 * config.memory_fraction) as usize,
-        ));
-        let disk_manager = Arc::new(DiskManager::new(&config.local_dirs)?);
-        Ok(Self {
-            config,
+        let RuntimeConfig {
+            batch_size,
             memory_manager,
             disk_manager,
+        } = config;
+
+        Ok(Self {
+            batch_size,
+            memory_manager: MemoryManager::new(memory_manager),
+            disk_manager: DiskManager::try_new(disk_manager)?,
         })
     }
 
     /// Get execution batch size based on config
     pub fn batch_size(&self) -> usize {
-        self.config.batch_size
+        self.batch_size
     }
 
     /// Register the consumer to get it tracked
@@ -84,16 +94,10 @@ pub struct RuntimeConfig {
     /// for buffer-in-memory batches since creating tiny batches would results
     /// in too much metadata memory consumption.
     pub batch_size: usize,
-    /// Max execution memory allowed for DataFusion.
-    /// Defaults to `usize::MAX`
-    pub max_memory: usize,
-    /// The fraction of total memory used for execution.
-    /// The purpose of this config is to set aside memory for untracked data 
structures,
-    /// and imprecise size estimation during memory acquisition.
-    /// Defaults to 0.7
-    pub memory_fraction: f64,
-    /// Local dirs to store temporary files during execution.
-    pub local_dirs: Vec<String>,
+    /// DiskManager to manage temporary disk file usage
+    pub disk_manager: DiskManagerConfig,
+    /// MemoryManager to limit access to memory
+    pub memory_manager: MemoryManagerConfig,
 }
 
 impl RuntimeConfig {
@@ -110,40 +114,25 @@ impl RuntimeConfig {
         self
     }
 
-    /// Customize exec size
-    pub fn with_max_execution_memory(mut self, max_memory: usize) -> Self {
-        assert!(max_memory > 0);
-        self.max_memory = max_memory;
+    /// Customize disk manager
+    pub fn with_disk_manager(mut self, disk_manager: DiskManagerConfig) -> 
Self {
+        self.disk_manager = disk_manager;
         self
     }
 
-    /// Customize exec memory fraction
-    pub fn with_memory_fraction(mut self, fraction: f64) -> Self {
-        assert!(fraction > 0f64 && fraction <= 1f64);
-        self.memory_fraction = fraction;
-        self
-    }
-
-    /// Customize exec size
-    pub fn with_local_dirs(mut self, local_dirs: Vec<String>) -> Self {
-        assert!(!local_dirs.is_empty());
-        self.local_dirs = local_dirs;
+    /// Customize memory manager
+    pub fn with_memory_manager(mut self, memory_manager: MemoryManagerConfig) 
-> Self {
+        self.memory_manager = memory_manager;
         self
     }
 }
 
 impl Default for RuntimeConfig {
     fn default() -> Self {
-        let tmp_dir = tempfile::tempdir().unwrap();
-        let path = tmp_dir.path().to_str().unwrap().to_string();
-        std::mem::forget(tmp_dir);
-
         Self {
             batch_size: 8192,
-            // Effectively "no limit"
-            max_memory: usize::MAX,
-            memory_fraction: 0.7,
-            local_dirs: vec![path],
+            disk_manager: DiskManagerConfig::default(),
+            memory_manager: MemoryManagerConfig::default(),
         }
     }
 }
diff --git a/datafusion/src/physical_plan/sorts/sort.rs 
b/datafusion/src/physical_plan/sorts/sort.rs
index 456023f..2933a5b 100644
--- a/datafusion/src/physical_plan/sorts/sort.rs
+++ b/datafusion/src/physical_plan/sorts/sort.rs
@@ -564,7 +564,7 @@ async fn do_sort(
 mod tests {
     use super::*;
     use crate::datasource::object_store::local::LocalFileSystem;
-    use crate::execution::runtime_env::RuntimeConfig;
+    use crate::execution::context::ExecutionConfig;
     use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
     use crate::physical_plan::expressions::col;
     use crate::physical_plan::memory::MemoryExec;
@@ -648,11 +648,9 @@ mod tests {
 
     #[tokio::test]
     async fn test_sort_spill() -> Result<()> {
-        let config = RuntimeConfig::new()
-            .with_memory_fraction(1.0)
-            // trigger spill there will be 4 batches with 5.5KB for each
-            .with_max_execution_memory(12288);
-        let runtime = Arc::new(RuntimeEnv::new(config)?);
+        // trigger spill there will be 4 batches with 5.5KB for each
+        let config = ExecutionConfig::new().with_memory_limit(12288, 1.0)?;
+        let runtime = Arc::new(RuntimeEnv::new(config.runtime)?);
 
         let schema = test_util::aggr_test_schema();
         let partitions = 4;

Reply via email to