This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new f3fbd9307 chore: Use checked operations when growing or shrinking 
unified memory pool (#2455)
f3fbd9307 is described below

commit f3fbd930733ec43967fac8a14fc16a166d65dcdf
Author: Andy Grove <[email protected]>
AuthorDate: Fri Sep 26 10:31:32 2025 -0600

    chore: Use checked operations when growing or shrinking unified memory pool 
(#2455)
---
 native/core/src/execution/memory_pools/mod.rs      |  4 +-
 .../src/execution/memory_pools/unified_pool.rs     | 68 +++++++++++++---------
 2 files changed, 44 insertions(+), 28 deletions(-)

diff --git a/native/core/src/execution/memory_pools/mod.rs 
b/native/core/src/execution/memory_pools/mod.rs
index 39b5a0ca3..c1fa5bbca 100644
--- a/native/core/src/execution/memory_pools/mod.rs
+++ b/native/core/src/execution/memory_pools/mod.rs
@@ -28,7 +28,7 @@ use jni::objects::GlobalRef;
 use once_cell::sync::OnceCell;
 use std::num::NonZeroUsize;
 use std::sync::Arc;
-use unified_pool::CometMemoryPool;
+use unified_pool::CometUnifiedMemoryPool;
 
 pub(crate) use config::*;
 pub(crate) use task_shared::*;
@@ -42,7 +42,7 @@ pub(crate) fn create_memory_pool(
     match memory_pool_config.pool_type {
         MemoryPoolType::Unified => {
             // Set Comet memory pool for native
-            let memory_pool = CometMemoryPool::new(comet_task_memory_manager);
+            let memory_pool = 
CometUnifiedMemoryPool::new(comet_task_memory_manager);
             Arc::new(TrackConsumersPool::new(
                 memory_pool,
                 NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(),
diff --git a/native/core/src/execution/memory_pools/unified_pool.rs 
b/native/core/src/execution/memory_pools/unified_pool.rs
index 8712200d6..bbde72258 100644
--- a/native/core/src/execution/memory_pools/unified_pool.rs
+++ b/native/core/src/execution/memory_pools/unified_pool.rs
@@ -23,42 +23,43 @@ use std::{
     },
 };
 
-use jni::objects::GlobalRef;
-
-use datafusion::{
-    common::{resources_datafusion_err, DataFusionError},
-    execution::memory_pool::{MemoryPool, MemoryReservation},
-};
-
 use crate::{
     errors::CometResult,
     jvm_bridge::{jni_call, JVMClasses},
 };
+use datafusion::{
+    common::{resources_datafusion_err, DataFusionError},
+    execution::memory_pool::{MemoryPool, MemoryReservation},
+};
+use jni::objects::GlobalRef;
+use log::warn;
 
-/// A DataFusion `MemoryPool` implementation for Comet. Internally this is
-/// implemented via delegating calls to 
[`crate::jvm_bridge::CometTaskMemoryManager`].
-pub struct CometMemoryPool {
+/// A DataFusion `MemoryPool` implementation for Comet that delegates to
+/// Spark's off-heap executor memory pool via JNI by calling
+/// [`crate::jvm_bridge::CometTaskMemoryManager`].
+pub struct CometUnifiedMemoryPool {
     task_memory_manager_handle: Arc<GlobalRef>,
     used: AtomicUsize,
 }
 
-impl Debug for CometMemoryPool {
+impl Debug for CometUnifiedMemoryPool {
     fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
-        f.debug_struct("CometMemoryPool")
+        f.debug_struct("CometUnifiedMemoryPool")
             .field("used", &self.used.load(Relaxed))
             .finish()
     }
 }
 
-impl CometMemoryPool {
-    pub fn new(task_memory_manager_handle: Arc<GlobalRef>) -> CometMemoryPool {
+impl CometUnifiedMemoryPool {
+    pub fn new(task_memory_manager_handle: Arc<GlobalRef>) -> 
CometUnifiedMemoryPool {
         Self {
             task_memory_manager_handle,
             used: AtomicUsize::new(0),
         }
     }
 
-    fn acquire(&self, additional: usize) -> CometResult<i64> {
+    /// Request memory from Spark's off-heap memory pool via JNI
+    fn acquire_from_spark(&self, additional: usize) -> CometResult<i64> {
         let mut env = JVMClasses::get_env()?;
         let handle = self.task_memory_manager_handle.as_obj();
         unsafe {
@@ -67,7 +68,8 @@ impl CometMemoryPool {
         }
     }
 
-    fn release(&self, size: usize) -> CometResult<()> {
+    /// Release memory to Spark's off-heap memory pool via JNI
+    fn release_to_spark(&self, size: usize) -> CometResult<()> {
         let mut env = JVMClasses::get_env()?;
         let handle = self.task_memory_manager_handle.as_obj();
         unsafe {
@@ -76,37 +78,42 @@ impl CometMemoryPool {
     }
 }
 
-impl Drop for CometMemoryPool {
+impl Drop for CometUnifiedMemoryPool {
     fn drop(&mut self) {
         let used = self.used.load(Relaxed);
         if used != 0 {
-            log::warn!("CometMemoryPool dropped with {used} bytes still 
reserved");
+            warn!("CometUnifiedMemoryPool dropped with {used} bytes still 
reserved");
         }
     }
 }
 
-unsafe impl Send for CometMemoryPool {}
-unsafe impl Sync for CometMemoryPool {}
+unsafe impl Send for CometUnifiedMemoryPool {}
+unsafe impl Sync for CometUnifiedMemoryPool {}
 
-impl MemoryPool for CometMemoryPool {
+impl MemoryPool for CometUnifiedMemoryPool {
     fn grow(&self, reservation: &MemoryReservation, additional: usize) {
         self.try_grow(reservation, additional).unwrap();
     }
 
     fn shrink(&self, _: &MemoryReservation, size: usize) {
-        self.release(size)
+        self.release_to_spark(size)
             .unwrap_or_else(|_| panic!("Failed to release {size} bytes"));
-        self.used.fetch_sub(size, Relaxed);
+        if let Err(prev) = self
+            .used
+            .fetch_update(Relaxed, Relaxed, |old| old.checked_sub(size))
+        {
+            panic!("overflow when releasing {size} of {prev} bytes");
+        }
     }
 
     fn try_grow(&self, _: &MemoryReservation, additional: usize) -> Result<(), 
DataFusionError> {
         if additional > 0 {
-            let acquired = self.acquire(additional)?;
+            let acquired = self.acquire_from_spark(additional)?;
             // If the number of bytes we acquired is less than the requested, 
return an error,
             // and hopefully will trigger spilling from the caller side.
             if acquired < additional as i64 {
                 // Release the acquired bytes before throwing error
-                self.release(acquired as usize)?;
+                self.release_to_spark(acquired as usize)?;
 
                 return Err(resources_datafusion_err!(
                     "Failed to acquire {} bytes, only got {}. Reserved: {}",
@@ -115,7 +122,16 @@ impl MemoryPool for CometMemoryPool {
                     self.reserved()
                 ));
             }
-            self.used.fetch_add(acquired as usize, Relaxed);
+            if let Err(prev) = self
+                .used
+                .fetch_update(Relaxed, Relaxed, |old| old.checked_add(acquired 
as usize))
+            {
+                return Err(resources_datafusion_err!(
+                    "Failed to acquire {} bytes due to overflow. Reserved: {}",
+                    additional,
+                    prev
+                ));
+            }
         }
         Ok(())
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to