This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push: new f1d020ae5160 [SPARK-53435][SQL] Fix race condition in CachedRDDBuilder f1d020ae5160 is described below commit f1d020ae51605e2a018c535c4ebfe4ae796f0ac5 Author: ziqi liu <ziqi....@databricks.com> AuthorDate: Mon Sep 1 10:48:37 2025 +0800 [SPARK-53435][SQL] Fix race condition in CachedRDDBuilder ### What changes were proposed in this pull request? There is race condition between `CachedRDDBuilder.cachedColumnBuffers` and `CachedRDDBuilder.clearCache`: when they interleave each other, `cachedColumnBuffers` might return a `nullptr`. This looks like a day-1 bug introduced from https://github.com/apache/spark/commit/20ca208bcda6f22fe7d9fb54144de435b4237536#diff-4068fce361a50e3d32af2ba2d4231905f500e7b2da9f46d5ddd99b758c30fd43 ### Why are the changes needed? The race condition might lead to NPE from [here](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L303) which is basically a null `RDD` returned from `CachedRDDBuilder.cachedColumnBuffers` ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? Theoretically this race condition might be triggered as long as cache materialization and unpersistence happen on different thread. But there is no reliable way to construct unit test. ### Was this patch authored or co-authored using generative AI tooling? NO Closes #52174 from liuzqt/SPARK-53435. Authored-by: ziqi liu <ziqi....@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 871fe3ded668048a9b23aa447be04cb2a7109300) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../sql/execution/columnar/InMemoryRelation.scala | 27 ++++++---------------- 1 file changed, 7 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index bccc2799ba90..510c5bd00380 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -225,35 +225,22 @@ case class CachedRDDBuilder( serializer.supportsColumnarInput(cachedPlan.output) } - def cachedColumnBuffers: RDD[CachedBatch] = { + def cachedColumnBuffers: RDD[CachedBatch] = synchronized { if (_cachedColumnBuffers == null) { - synchronized { - if (_cachedColumnBuffers == null) { - _cachedColumnBuffers = buildBuffers() - } - } + _cachedColumnBuffers = buildBuffers() } _cachedColumnBuffers } - def clearCache(blocking: Boolean = false): Unit = { + def clearCache(blocking: Boolean = false): Unit = synchronized { if (_cachedColumnBuffers != null) { - synchronized { - if (_cachedColumnBuffers != null) { - _cachedColumnBuffers.unpersist(blocking) - _cachedColumnBuffers = null - } - } + _cachedColumnBuffers.unpersist(blocking) + _cachedColumnBuffers = null } } - def isCachedColumnBuffersLoaded: Boolean = { - if (_cachedColumnBuffers != null) { - synchronized { - return _cachedColumnBuffers != null && isCachedRDDLoaded - } - } - false + def isCachedColumnBuffersLoaded: Boolean = synchronized { + _cachedColumnBuffers != null && isCachedRDDLoaded } private def isCachedRDDLoaded: Boolean = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org