This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new e52b0487583 [SPARK-39104][SQL] 
InMemoryRelation#isCachedColumnBuffersLoaded should be thread-safe
e52b0487583 is described below

commit e52b0487583314ae159dab3496be3c28df3e56b7
Author: Cheng Pan <cheng...@apache.org>
AuthorDate: Tue May 17 18:26:55 2022 -0500

    [SPARK-39104][SQL] InMemoryRelation#isCachedColumnBuffersLoaded should be 
thread-safe
    
    ### What changes were proposed in this pull request?
    
    Add `synchronized` on method `isCachedColumnBuffersLoaded`
    
    ### Why are the changes needed?
    
    `isCachedColumnBuffersLoaded` should has `synchronized` wrapped, otherwise 
may cause NPE when modify `_cachedColumnBuffers` concurrently.
    
    ```
    def isCachedColumnBuffersLoaded: Boolean = {
      _cachedColumnBuffers != null && isCachedRDDLoaded
    }
    
    def isCachedRDDLoaded: Boolean = {
        _cachedColumnBuffersAreLoaded || {
          val bmMaster = SparkEnv.get.blockManager.master
          val rddLoaded = _cachedColumnBuffers.partitions.forall { partition =>
            bmMaster.getBlockStatus(RDDBlockId(_cachedColumnBuffers.id, 
partition.index), false)
              .exists { case(_, blockStatus) => blockStatus.isCached }
          }
          if (rddLoaded) {
            _cachedColumnBuffersAreLoaded = rddLoaded
          }
          rddLoaded
      }
    }
    ```
    
    ```
    java.lang.NullPointerException
        at 
org.apache.spark.sql.execution.columnar.CachedRDDBuilder.isCachedRDDLoaded(InMemoryRelation.scala:247)
        at 
org.apache.spark.sql.execution.columnar.CachedRDDBuilder.isCachedColumnBuffersLoaded(InMemoryRelation.scala:241)
        at 
org.apache.spark.sql.execution.CacheManager.$anonfun$uncacheQuery$8(CacheManager.scala:189)
        at 
org.apache.spark.sql.execution.CacheManager.$anonfun$uncacheQuery$8$adapted(CacheManager.scala:176)
        at 
scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:304)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at 
scala.collection.TraversableLike.filterImpl(TraversableLike.scala:303)
        at 
scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:297)
        at 
scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
        at scala.collection.TraversableLike.filter(TraversableLike.scala:395)
        at scala.collection.TraversableLike.filter$(TraversableLike.scala:395)
        at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
        at 
org.apache.spark.sql.execution.CacheManager.recacheByCondition(CacheManager.scala:219)
        at 
org.apache.spark.sql.execution.CacheManager.uncacheQuery(CacheManager.scala:176)
        at org.apache.spark.sql.Dataset.unpersist(Dataset.scala:3220)
        at org.apache.spark.sql.Dataset.unpersist(Dataset.scala:3231)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing UT.
    
    Closes #36496 from pan3793/SPARK-39104.
    
    Authored-by: Cheng Pan <cheng...@apache.org>
    Signed-off-by: Sean Owen <sro...@gmail.com>
    (cherry picked from commit 3c8d8d7a864281fbe080316ad8de9b8eac80fa71)
    Signed-off-by: Sean Owen <sro...@gmail.com>
---
 .../sql/execution/columnar/InMemoryRelation.scala  |  9 +++-
 .../columnar/InMemoryColumnarQuerySuite.scala      | 53 ++++++++++++++++++++++
 2 files changed, 60 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index 89323e7d1a4..0ace24777b7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -238,10 +238,15 @@ case class CachedRDDBuilder(
   }
 
   def isCachedColumnBuffersLoaded: Boolean = {
-    _cachedColumnBuffers != null && isCachedRDDLoaded
+    if (_cachedColumnBuffers != null) {
+      synchronized {
+        return _cachedColumnBuffers != null && isCachedRDDLoaded
+      }
+    }
+    false
   }
 
-  def isCachedRDDLoaded: Boolean = {
+  private def isCachedRDDLoaded: Boolean = {
       _cachedColumnBuffersAreLoaded || {
         val bmMaster = SparkEnv.get.blockManager.master
         val rddLoaded = _cachedColumnBuffers.partitions.forall { partition =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index 120ddf469f4..779aa49a344 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.columnar
 
 import java.nio.charset.StandardCharsets
 import java.sql.{Date, Timestamp}
+import java.util.concurrent.atomic.AtomicInteger
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, QueryTest, Row}
@@ -563,4 +564,56 @@ class InMemoryColumnarQuerySuite extends QueryTest with 
SharedSparkSession {
       }
     }
   }
+
+  test("SPARK-39104: InMemoryRelation#isCachedColumnBuffersLoaded should be 
thread-safe") {
+    val plan = spark.range(1).queryExecution.executedPlan
+    val serializer = new TestCachedBatchSerializer(true, 1)
+    val cachedRDDBuilder = CachedRDDBuilder(serializer, MEMORY_ONLY, plan, 
None)
+
+    @volatile var isCachedColumnBuffersLoaded = false
+    @volatile var stopped = false
+
+    val th1 = new Thread {
+      override def run(): Unit = {
+        while (!isCachedColumnBuffersLoaded && !stopped) {
+          cachedRDDBuilder.cachedColumnBuffers
+          cachedRDDBuilder.clearCache()
+        }
+      }
+    }
+
+    val th2 = new Thread {
+      override def run(): Unit = {
+        while (!isCachedColumnBuffersLoaded && !stopped) {
+          isCachedColumnBuffersLoaded = 
cachedRDDBuilder.isCachedColumnBuffersLoaded
+        }
+      }
+    }
+
+    val th3 = new Thread {
+      override def run(): Unit = {
+        Thread.sleep(3000L)
+        stopped = true
+      }
+    }
+
+    val exceptionCnt = new AtomicInteger
+    val exceptionHandler: Thread.UncaughtExceptionHandler = (_: Thread, cause: 
Throwable) => {
+        exceptionCnt.incrementAndGet
+        fail(cause)
+      }
+
+    th1.setUncaughtExceptionHandler(exceptionHandler)
+    th2.setUncaughtExceptionHandler(exceptionHandler)
+    th1.start()
+    th2.start()
+    th3.start()
+    th1.join()
+    th2.join()
+    th3.join()
+
+    cachedRDDBuilder.clearCache()
+
+    assert(exceptionCnt.get == 0)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to