This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new e52b0487583 [SPARK-39104][SQL] InMemoryRelation#isCachedColumnBuffersLoaded should be thread-safe e52b0487583 is described below commit e52b0487583314ae159dab3496be3c28df3e56b7 Author: Cheng Pan <cheng...@apache.org> AuthorDate: Tue May 17 18:26:55 2022 -0500 [SPARK-39104][SQL] InMemoryRelation#isCachedColumnBuffersLoaded should be thread-safe ### What changes were proposed in this pull request? Add `synchronized` on method `isCachedColumnBuffersLoaded` ### Why are the changes needed? `isCachedColumnBuffersLoaded` should has `synchronized` wrapped, otherwise may cause NPE when modify `_cachedColumnBuffers` concurrently. ``` def isCachedColumnBuffersLoaded: Boolean = { _cachedColumnBuffers != null && isCachedRDDLoaded } def isCachedRDDLoaded: Boolean = { _cachedColumnBuffersAreLoaded || { val bmMaster = SparkEnv.get.blockManager.master val rddLoaded = _cachedColumnBuffers.partitions.forall { partition => bmMaster.getBlockStatus(RDDBlockId(_cachedColumnBuffers.id, partition.index), false) .exists { case(_, blockStatus) => blockStatus.isCached } } if (rddLoaded) { _cachedColumnBuffersAreLoaded = rddLoaded } rddLoaded } } ``` ``` java.lang.NullPointerException at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.isCachedRDDLoaded(InMemoryRelation.scala:247) at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.isCachedColumnBuffersLoaded(InMemoryRelation.scala:241) at org.apache.spark.sql.execution.CacheManager.$anonfun$uncacheQuery$8(CacheManager.scala:189) at org.apache.spark.sql.execution.CacheManager.$anonfun$uncacheQuery$8$adapted(CacheManager.scala:176) at scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:304) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:303) at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:297) at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108) at scala.collection.TraversableLike.filter(TraversableLike.scala:395) at scala.collection.TraversableLike.filter$(TraversableLike.scala:395) at scala.collection.AbstractTraversable.filter(Traversable.scala:108) at org.apache.spark.sql.execution.CacheManager.recacheByCondition(CacheManager.scala:219) at org.apache.spark.sql.execution.CacheManager.uncacheQuery(CacheManager.scala:176) at org.apache.spark.sql.Dataset.unpersist(Dataset.scala:3220) at org.apache.spark.sql.Dataset.unpersist(Dataset.scala:3231) ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UT. Closes #36496 from pan3793/SPARK-39104. Authored-by: Cheng Pan <cheng...@apache.org> Signed-off-by: Sean Owen <sro...@gmail.com> (cherry picked from commit 3c8d8d7a864281fbe080316ad8de9b8eac80fa71) Signed-off-by: Sean Owen <sro...@gmail.com> --- .../sql/execution/columnar/InMemoryRelation.scala | 9 +++- .../columnar/InMemoryColumnarQuerySuite.scala | 53 ++++++++++++++++++++++ 2 files changed, 60 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 89323e7d1a4..0ace24777b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -238,10 +238,15 @@ case class CachedRDDBuilder( } def isCachedColumnBuffersLoaded: Boolean = { - _cachedColumnBuffers != null && isCachedRDDLoaded + if (_cachedColumnBuffers != null) { + synchronized { + return _cachedColumnBuffers != null && isCachedRDDLoaded + } + } + false } - def isCachedRDDLoaded: Boolean = { + private def isCachedRDDLoaded: Boolean = { _cachedColumnBuffersAreLoaded || { val bmMaster = SparkEnv.get.blockManager.master val rddLoaded = _cachedColumnBuffers.partitions.forall { partition => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index 120ddf469f4..779aa49a344 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.columnar import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} +import java.util.concurrent.atomic.AtomicInteger import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, QueryTest, Row} @@ -563,4 +564,56 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSparkSession { } } } + + test("SPARK-39104: InMemoryRelation#isCachedColumnBuffersLoaded should be thread-safe") { + val plan = spark.range(1).queryExecution.executedPlan + val serializer = new TestCachedBatchSerializer(true, 1) + val cachedRDDBuilder = CachedRDDBuilder(serializer, MEMORY_ONLY, plan, None) + + @volatile var isCachedColumnBuffersLoaded = false + @volatile var stopped = false + + val th1 = new Thread { + override def run(): Unit = { + while (!isCachedColumnBuffersLoaded && !stopped) { + cachedRDDBuilder.cachedColumnBuffers + cachedRDDBuilder.clearCache() + } + } + } + + val th2 = new Thread { + override def run(): Unit = { + while (!isCachedColumnBuffersLoaded && !stopped) { + isCachedColumnBuffersLoaded = cachedRDDBuilder.isCachedColumnBuffersLoaded + } + } + } + + val th3 = new Thread { + override def run(): Unit = { + Thread.sleep(3000L) + stopped = true + } + } + + val exceptionCnt = new AtomicInteger + val exceptionHandler: Thread.UncaughtExceptionHandler = (_: Thread, cause: Throwable) => { + exceptionCnt.incrementAndGet + fail(cause) + } + + th1.setUncaughtExceptionHandler(exceptionHandler) + th2.setUncaughtExceptionHandler(exceptionHandler) + th1.start() + th2.start() + th3.start() + th1.join() + th2.join() + th3.join() + + cachedRDDBuilder.clearCache() + + assert(exceptionCnt.get == 0) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org