This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new f3fbd9307 chore: Use checked operations when growing or shrinking
unified memory pool (#2455)
f3fbd9307 is described below
commit f3fbd930733ec43967fac8a14fc16a166d65dcdf
Author: Andy Grove <[email protected]>
AuthorDate: Fri Sep 26 10:31:32 2025 -0600
chore: Use checked operations when growing or shrinking unified memory pool
(#2455)
---
native/core/src/execution/memory_pools/mod.rs | 4 +-
.../src/execution/memory_pools/unified_pool.rs | 68 +++++++++++++---------
2 files changed, 44 insertions(+), 28 deletions(-)
diff --git a/native/core/src/execution/memory_pools/mod.rs
b/native/core/src/execution/memory_pools/mod.rs
index 39b5a0ca3..c1fa5bbca 100644
--- a/native/core/src/execution/memory_pools/mod.rs
+++ b/native/core/src/execution/memory_pools/mod.rs
@@ -28,7 +28,7 @@ use jni::objects::GlobalRef;
use once_cell::sync::OnceCell;
use std::num::NonZeroUsize;
use std::sync::Arc;
-use unified_pool::CometMemoryPool;
+use unified_pool::CometUnifiedMemoryPool;
pub(crate) use config::*;
pub(crate) use task_shared::*;
@@ -42,7 +42,7 @@ pub(crate) fn create_memory_pool(
match memory_pool_config.pool_type {
MemoryPoolType::Unified => {
// Set Comet memory pool for native
- let memory_pool = CometMemoryPool::new(comet_task_memory_manager);
+ let memory_pool =
CometUnifiedMemoryPool::new(comet_task_memory_manager);
Arc::new(TrackConsumersPool::new(
memory_pool,
NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(),
diff --git a/native/core/src/execution/memory_pools/unified_pool.rs
b/native/core/src/execution/memory_pools/unified_pool.rs
index 8712200d6..bbde72258 100644
--- a/native/core/src/execution/memory_pools/unified_pool.rs
+++ b/native/core/src/execution/memory_pools/unified_pool.rs
@@ -23,42 +23,43 @@ use std::{
},
};
-use jni::objects::GlobalRef;
-
-use datafusion::{
- common::{resources_datafusion_err, DataFusionError},
- execution::memory_pool::{MemoryPool, MemoryReservation},
-};
-
use crate::{
errors::CometResult,
jvm_bridge::{jni_call, JVMClasses},
};
+use datafusion::{
+ common::{resources_datafusion_err, DataFusionError},
+ execution::memory_pool::{MemoryPool, MemoryReservation},
+};
+use jni::objects::GlobalRef;
+use log::warn;
-/// A DataFusion `MemoryPool` implementation for Comet. Internally this is
-/// implemented via delegating calls to
[`crate::jvm_bridge::CometTaskMemoryManager`].
-pub struct CometMemoryPool {
+/// A DataFusion `MemoryPool` implementation for Comet that delegates to
+/// Spark's off-heap executor memory pool via JNI by calling
+/// [`crate::jvm_bridge::CometTaskMemoryManager`].
+pub struct CometUnifiedMemoryPool {
task_memory_manager_handle: Arc<GlobalRef>,
used: AtomicUsize,
}
-impl Debug for CometMemoryPool {
+impl Debug for CometUnifiedMemoryPool {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
- f.debug_struct("CometMemoryPool")
+ f.debug_struct("CometUnifiedMemoryPool")
.field("used", &self.used.load(Relaxed))
.finish()
}
}
-impl CometMemoryPool {
- pub fn new(task_memory_manager_handle: Arc<GlobalRef>) -> CometMemoryPool {
+impl CometUnifiedMemoryPool {
+ pub fn new(task_memory_manager_handle: Arc<GlobalRef>) ->
CometUnifiedMemoryPool {
Self {
task_memory_manager_handle,
used: AtomicUsize::new(0),
}
}
- fn acquire(&self, additional: usize) -> CometResult<i64> {
+ /// Request memory from Spark's off-heap memory pool via JNI
+ fn acquire_from_spark(&self, additional: usize) -> CometResult<i64> {
let mut env = JVMClasses::get_env()?;
let handle = self.task_memory_manager_handle.as_obj();
unsafe {
@@ -67,7 +68,8 @@ impl CometMemoryPool {
}
}
- fn release(&self, size: usize) -> CometResult<()> {
+ /// Release memory to Spark's off-heap memory pool via JNI
+ fn release_to_spark(&self, size: usize) -> CometResult<()> {
let mut env = JVMClasses::get_env()?;
let handle = self.task_memory_manager_handle.as_obj();
unsafe {
@@ -76,37 +78,42 @@ impl CometMemoryPool {
}
}
-impl Drop for CometMemoryPool {
+impl Drop for CometUnifiedMemoryPool {
fn drop(&mut self) {
let used = self.used.load(Relaxed);
if used != 0 {
- log::warn!("CometMemoryPool dropped with {used} bytes still
reserved");
+ warn!("CometUnifiedMemoryPool dropped with {used} bytes still
reserved");
}
}
}
-unsafe impl Send for CometMemoryPool {}
-unsafe impl Sync for CometMemoryPool {}
+unsafe impl Send for CometUnifiedMemoryPool {}
+unsafe impl Sync for CometUnifiedMemoryPool {}
-impl MemoryPool for CometMemoryPool {
+impl MemoryPool for CometUnifiedMemoryPool {
fn grow(&self, reservation: &MemoryReservation, additional: usize) {
self.try_grow(reservation, additional).unwrap();
}
fn shrink(&self, _: &MemoryReservation, size: usize) {
- self.release(size)
+ self.release_to_spark(size)
.unwrap_or_else(|_| panic!("Failed to release {size} bytes"));
- self.used.fetch_sub(size, Relaxed);
+ if let Err(prev) = self
+ .used
+ .fetch_update(Relaxed, Relaxed, |old| old.checked_sub(size))
+ {
+ panic!("overflow when releasing {size} of {prev} bytes");
+ }
}
fn try_grow(&self, _: &MemoryReservation, additional: usize) -> Result<(),
DataFusionError> {
if additional > 0 {
- let acquired = self.acquire(additional)?;
+ let acquired = self.acquire_from_spark(additional)?;
// If the number of bytes we acquired is less than the requested,
return an error,
// and hopefully will trigger spilling from the caller side.
if acquired < additional as i64 {
// Release the acquired bytes before throwing error
- self.release(acquired as usize)?;
+ self.release_to_spark(acquired as usize)?;
return Err(resources_datafusion_err!(
"Failed to acquire {} bytes, only got {}. Reserved: {}",
@@ -115,7 +122,16 @@ impl MemoryPool for CometMemoryPool {
self.reserved()
));
}
- self.used.fetch_add(acquired as usize, Relaxed);
+ if let Err(prev) = self
+ .used
+ .fetch_update(Relaxed, Relaxed, |old| old.checked_add(acquired
as usize))
+ {
+ return Err(resources_datafusion_err!(
+ "Failed to acquire {} bytes due to overflow. Reserved: {}",
+ additional,
+ prev
+ ));
+ }
}
Ok(())
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]