kosiew commented on code in PR #22246:
URL: https://github.com/apache/datafusion/pull/22246#discussion_r3286523260
##########
datafusion/execution/src/disk_manager.rs:
##########
@@ -210,53 +211,76 @@ impl DiskManager {
);
Ok(Arc::new(Self {
local_dirs: Mutex::new(Some(local_dirs)),
- max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
+ max_temp_directory_size: AtomicU64::new(
+ DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
+ ),
used_disk_space: Arc::new(AtomicU64::new(0)),
active_files_count: Arc::new(AtomicUsize::new(0)),
}))
}
DiskManagerConfig::Disabled => Ok(Arc::new(Self {
local_dirs: Mutex::new(None),
- max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
+ max_temp_directory_size:
AtomicU64::new(DEFAULT_MAX_TEMP_DIRECTORY_SIZE),
used_disk_space: Arc::new(AtomicU64::new(0)),
active_files_count: Arc::new(AtomicUsize::new(0)),
})),
}
}
+ /// Set the max temp directory size. Requires exclusive access.
+ ///
+ /// Prefer [`Self::update_max_temp_directory_size`] which takes `&self` and
+ /// works through `Arc` without exclusive access.
+ #[deprecated(
+ since = "54.0.0",
+ note = "Use `update_max_temp_directory_size` instead, which takes
&self and works through Arc."
+ )]
pub fn set_max_temp_directory_size(
Review Comment:
`set_max_temp_directory_size` still takes `&mut self` and is deprecated, but
the PR contract says this setter should become dynamically adjustable through
shared access.
As written, code that has the production `Arc<DiskManager>` still cannot
call `disk_manager.set_max_temp_directory_size(...)`; it has to know about and
use the new `update_*` method instead. Could you please change this public
setter to take `&self` and do the atomic store directly?
It would also be good to add a regression test that calls
`Arc<DiskManager>::set_max_temp_directory_size` through shared access.
##########
datafusion/execution/src/disk_manager.rs:
##########
@@ -418,11 +442,15 @@ impl RefCountedTempFile {
// 3. Check if the updated global disk usage exceeds the configured
limit
let global_disk_usage =
self.disk_manager.used_disk_space.load(Ordering::Relaxed);
- if global_disk_usage > self.disk_manager.max_temp_directory_size {
+ let limit = self
Review Comment:
I think lowering the limit below current usage can leave `used_disk_space`
permanently inflated after a failed spill update.
`update_disk_usage` subtracts the old size, adds the new size, then returns
an error when `global_disk_usage > limit` before storing
`current_file_disk_usage = new_disk_usage`. If that file is later dropped,
`Drop` subtracts only the old value, not the new value that was already added
to the global counter.
That breaks the stated invariant that usage naturally drops as existing
spill files are released. Could you please either roll back the global counter
on error or record the new per-file size before returning?
Please also add a real temp-file test that lowers the limit, attempts
`update_disk_usage`, drops the file, and verifies `used_disk_space()` returns
to zero.
##########
datafusion/execution/src/disk_manager.rs:
##########
@@ -210,53 +211,76 @@ impl DiskManager {
);
Ok(Arc::new(Self {
local_dirs: Mutex::new(Some(local_dirs)),
- max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
+ max_temp_directory_size: AtomicU64::new(
+ DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
+ ),
used_disk_space: Arc::new(AtomicU64::new(0)),
active_files_count: Arc::new(AtomicUsize::new(0)),
}))
}
DiskManagerConfig::Disabled => Ok(Arc::new(Self {
local_dirs: Mutex::new(None),
- max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
+ max_temp_directory_size:
AtomicU64::new(DEFAULT_MAX_TEMP_DIRECTORY_SIZE),
used_disk_space: Arc::new(AtomicU64::new(0)),
active_files_count: Arc::new(AtomicUsize::new(0)),
})),
}
}
+ /// Set the max temp directory size. Requires exclusive access.
+ ///
+ /// Prefer [`Self::update_max_temp_directory_size`] which takes `&self` and
+ /// works through `Arc` without exclusive access.
+ #[deprecated(
+ since = "54.0.0",
+ note = "Use `update_max_temp_directory_size` instead, which takes
&self and works through Arc."
+ )]
pub fn set_max_temp_directory_size(
&mut self,
max_temp_directory_size: u64,
) -> Result<()> {
- // If the disk manager is disabled and `max_temp_directory_size` is
not 0,
- // this operation is not meaningful, fail early.
+ self.update_max_temp_directory_size(max_temp_directory_size)
+ }
+
+ /// Atomically update the max temp directory size at runtime.
+ ///
+ /// Takes `&self` (not `&mut self`), so it works through `Arc<DiskManager>`
+ /// without requiring exclusive access. Takes effect immediately for
+ /// subsequent spill writes.
+ ///
+ /// Use this when you need to adjust the limit dynamically while queries
+ /// are running (e.g., adapting to available disk space).
+ pub fn update_max_temp_directory_size(
+ &self,
+ max_temp_directory_size: u64,
+ ) -> Result<()> {
if self.local_dirs.lock().is_none() && max_temp_directory_size != 0 {
return config_err!(
"Cannot set max temp directory size for a disk manager that
spilling is disabled"
);
}
- self.max_temp_directory_size = max_temp_directory_size;
+ self.max_temp_directory_size
Review Comment:
Small suggestion: `Ordering::Relaxed` may be enough for this limit
store/load unless there is a specific cross-memory synchronization invariant
here.
The limit looks like a standalone atomic value, so Acquire/Release may
suggest ordering guarantees that the surrounding code does not seem to rely on.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]