This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new f1d020ae5160 [SPARK-53435][SQL] Fix race condition in CachedRDDBuilder
f1d020ae5160 is described below

commit f1d020ae51605e2a018c535c4ebfe4ae796f0ac5
Author: ziqi liu <ziqi....@databricks.com>
AuthorDate: Mon Sep 1 10:48:37 2025 +0800

    [SPARK-53435][SQL] Fix race condition in CachedRDDBuilder
    
    ### What changes were proposed in this pull request?
    
    There is race condition between `CachedRDDBuilder.cachedColumnBuffers` and 
`CachedRDDBuilder.clearCache`: when they interleave each other, 
`cachedColumnBuffers` might return a `nullptr`.
    
    This looks like a day-1 bug introduced from  
https://github.com/apache/spark/commit/20ca208bcda6f22fe7d9fb54144de435b4237536#diff-4068fce361a50e3d32af2ba2d4231905f500e7b2da9f46d5ddd99b758c30fd43
    
    ### Why are the changes needed?
    
    The race condition might lead to NPE from 
[here](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L303)
 which is basically a null `RDD` returned from 
`CachedRDDBuilder.cachedColumnBuffers`
    
    ### Does this PR introduce _any_ user-facing change?
    NO
    
    ### How was this patch tested?
    Theoretically this race condition might be triggered as long as cache 
materialization and unpersistence happen on different thread. But there is no 
reliable way to construct unit test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    NO
    
    Closes #52174 from liuzqt/SPARK-53435.
    
    Authored-by: ziqi liu <ziqi....@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit 871fe3ded668048a9b23aa447be04cb2a7109300)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../sql/execution/columnar/InMemoryRelation.scala  | 27 ++++++----------------
 1 file changed, 7 insertions(+), 20 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index bccc2799ba90..510c5bd00380 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -225,35 +225,22 @@ case class CachedRDDBuilder(
       serializer.supportsColumnarInput(cachedPlan.output)
   }
 
-  def cachedColumnBuffers: RDD[CachedBatch] = {
+  def cachedColumnBuffers: RDD[CachedBatch] = synchronized {
     if (_cachedColumnBuffers == null) {
-      synchronized {
-        if (_cachedColumnBuffers == null) {
-          _cachedColumnBuffers = buildBuffers()
-        }
-      }
+      _cachedColumnBuffers = buildBuffers()
     }
     _cachedColumnBuffers
   }
 
-  def clearCache(blocking: Boolean = false): Unit = {
+  def clearCache(blocking: Boolean = false): Unit = synchronized {
     if (_cachedColumnBuffers != null) {
-      synchronized {
-        if (_cachedColumnBuffers != null) {
-          _cachedColumnBuffers.unpersist(blocking)
-          _cachedColumnBuffers = null
-        }
-      }
+      _cachedColumnBuffers.unpersist(blocking)
+      _cachedColumnBuffers = null
     }
   }
 
-  def isCachedColumnBuffersLoaded: Boolean = {
-    if (_cachedColumnBuffers != null) {
-      synchronized {
-        return _cachedColumnBuffers != null && isCachedRDDLoaded
-      }
-    }
-    false
+  def isCachedColumnBuffersLoaded: Boolean = synchronized {
+    _cachedColumnBuffers != null && isCachedRDDLoaded
   }
 
   private def isCachedRDDLoaded: Boolean = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to