baibaichen opened a new issue, #11485:
URL: https://github.com/apache/incubator-gluten/issues/11485

   ### Backend
   
   VL (Velox)
   
   ### Bug description
   
   There is a race condition in 
`VeloxMemoryManager::getOrCreateArrowMemoryPool` due to the use of `weak_ptr` 
and destructor callback not being in the same critical section.
   
   **Current Implementation:**
   
   ```cpp
   // VeloxMemoryManager.h
   std::unordered_map<std::string, std::weak_ptr<ArrowMemoryPool>> arrowPools_;
   
   // VeloxMemoryManager.cc
   std::shared_ptr<arrow::MemoryPool> 
VeloxMemoryManager::getOrCreateArrowMemoryPool(const std::string& name) {
     std::lock_guard<std::mutex> l(mutex_);
     if (const auto it = arrowPools_.find(name); it != arrowPools_.end()) {
       auto pool = it->second.lock();  // ① Try to get shared_ptr
       VELOX_CHECK_NOT_NULL(pool, "Arrow memory pool {} has been destructed", 
name);  // ② Check
       return pool;
     }
     auto pool = std::make_shared<ArrowMemoryPool>(
         blockListener_.get(), [this, name](arrow::MemoryPool* pool) { 
this->dropMemoryPool(name); });  // ③ Destructor callback
     arrowPools_.emplace(name, pool);
     return pool;
   }
   
   void VeloxMemoryManager::dropMemoryPool(const std::string& name) {
     std::lock_guard<std::mutex> l(mutex_);  // ④ Acquire lock
     const auto ret = arrowPools_.erase(name);
     VELOX_CHECK_EQ(ret, 1, "Child memory pool {} doesn't exist", name);
   }
   
   // ArrowMemoryPool.cc
   ArrowMemoryPool::~ArrowMemoryPool() {
     if (releaser_ != nullptr) {
       releaser_(this);  // Calls dropMemoryPool(name)
     }
   }
   ```
   
   **Race Condition Scenario:**
   
   ```
   Timeline:
   T1: VeloxParquetDataSource::close() 
       → parquetWriter_ destructs 
       → ArrowMemoryPool destruction begins
       → Destructor callback dropMemoryPool() called
       → Acquires mutex_ lock
       → arrowPools_.erase(name) ✓
       → Releases lock
   
   T2: getOrCreateArrowMemoryPool(name) called
       → Acquires mutex_ lock (after T1 releases lock, but ArrowMemoryPool 
destruction not complete)
       → arrowPools_.find(name) not found (already erased)
       → Creates new ArrowMemoryPool
       → But old pool may still be destructing...
   ```
   
   **More severe scenario:**
   
   ```
   T1: weak_ptr::lock() returns shared_ptr (ref count = 1)
       → Releases mutex_
       
   T2: Last shared_ptr destruction begins
       → Calls dropMemoryPool() 
       → Acquires mutex_
       → erase succeeds
       → Releases mutex_
       
   T3: getOrCreateArrowMemoryPool(name)
       → Acquires mutex_
       → find(name) not found
       → Creates new pool and emplaces
       → Releases mutex_
       
   ```
   
   **Root Cause:**
   1. `dropMemoryPool` is called during `shared_ptr` destruction, but it only 
removes the entry from the map
   2. Removing the entry and actual object destruction are NOT atomic — after 
`dropMemoryPool` completes and releases the lock, the `ArrowMemoryPool` 
destructor may still be executing
   3. `lock()` success doesn't guarantee object stability — after 
`getOrCreateArrowMemoryPool` releases the lock, the returned `shared_ptr` may 
be the last reference
   
   **Call Path:**
   ```
   Java_org_apache_gluten_datasource_VeloxDataSourceJniWrapper_init
     → VeloxRuntime::createDataSource
     → datasource->init(datasourceOptions)
     → VeloxParquetDataSource::init
     → makeParquetWriteOption(sparkConfs)
     → 
getDefaultMemoryManager()->getOrCreateArrowMemoryPool("VeloxParquetWrite.ArrowMemoryPool")
   ```
   
   **Suggested Fix:**
   Change `weak_ptr` to `shared_ptr` so that `VeloxMemoryManager` always holds 
a strong reference:
   
   ```cpp
   // VeloxMemoryManager.h
   std::unordered_map<std::string, std::shared_ptr<ArrowMemoryPool>> 
arrowPools_;
   
   // VeloxMemoryManager.cc  
   std::shared_ptr<arrow::MemoryPool> 
VeloxMemoryManager::getOrCreateArrowMemoryPool(const std::string& name) {
     std::lock_guard<std::mutex> l(mutex_);
     if (const auto it = arrowPools_.find(name); it != arrowPools_.end()) {
       return it->second;
     }
     auto pool = std::make_shared<ArrowMemoryPool>(blockListener_.get());
     arrowPools_.emplace(name, pool);
     return pool;
   }
   // Remove dropMemoryPool() function and destructor callback
   ```
   
   ### Gluten version
   
   main branch
   
   ### Spark version
   
   Spark-3.5.x
   
   ### Spark configurations
   
   _No response_
   
   ### System information
   
   _No response_
   
   ### Relevant logs
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to