This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 43f79326106 [SPARK-46480][CORE][SQL] Fix NPE when table cache task attempt 43f79326106 is described below commit 43f79326106acb277b9edfb28c34f5dc310b416b Author: ulysses-you <ulyssesyo...@gmail.com> AuthorDate: Fri Dec 22 13:26:02 2023 +0800 [SPARK-46480][CORE][SQL] Fix NPE when table cache task attempt ### What changes were proposed in this pull request? This pr adds a check: we only mark the cached partition is materialized if the task is not failed and not interrupted. And adds a new method `isFailed` in `TaskContext`. ### Why are the changes needed? Before this pr, when do cache, task failure can cause NPE in other tasks ``` java.lang.NullPointerException at java.nio.ByteBuffer.wrap(ByteBuffer.java:396) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.accessors1$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown Source) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:155) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) ``` ### Does this PR introduce _any_ user-facing change? yes, it's a bug fix ### How was this patch tested? add test ### Was this patch authored or co-authored using generative AI tooling? no Closes #44445 from ulysses-you/fix-cache. Authored-by: ulysses-you <ulyssesyo...@gmail.com> Signed-off-by: Kent Yao <y...@apache.org> --- core/src/main/scala/org/apache/spark/BarrierTaskContext.scala | 2 ++ core/src/main/scala/org/apache/spark/TaskContext.scala | 5 +++++ core/src/main/scala/org/apache/spark/TaskContextImpl.scala | 2 ++ .../scala/org/apache/spark/scheduler/TaskContextSuite.scala | 10 ++++++++++ project/MimaExcludes.scala | 4 +++- .../apache/spark/sql/execution/columnar/InMemoryRelation.scala | 8 +++++--- 6 files changed, 27 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala index 0f9abaf94ae..50aff8b0fb1 100644 --- a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala +++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala @@ -194,6 +194,8 @@ class BarrierTaskContext private[spark] ( override def isCompleted(): Boolean = taskContext.isCompleted() + override def isFailed(): Boolean = taskContext.isFailed() + override def isInterrupted(): Boolean = taskContext.isInterrupted() override def addTaskCompletionListener(listener: TaskCompletionListener): this.type = { diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index 0f8a10d734b..15ddd08fb4a 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -94,6 +94,11 @@ abstract class TaskContext extends Serializable { */ def isCompleted(): Boolean + /** + * Returns true if the task has failed. + */ + def isFailed(): Boolean + /** * Returns true if the task has been killed. */ diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala index 8d2c2ab9bc4..a3c36de1515 100644 --- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala +++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala @@ -275,6 +275,8 @@ private[spark] class TaskContextImpl( @GuardedBy("this") override def isCompleted(): Boolean = synchronized(completed) + override def isFailed(): Boolean = synchronized(failureCauseOpt.isDefined) + override def isInterrupted(): Boolean = reasonIfKilled.isDefined override def getLocalProperty(key: String): String = localProperties.getProperty(key) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala index c56fd3fd1f5..9aba41cea21 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala @@ -670,6 +670,16 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark assert(invocationOrder === Seq("C", "B", "A", "D")) } + test("SPARK-46480: Add isFailed in TaskContext") { + val context = TaskContext.empty() + var isFailed = false + context.addTaskCompletionListener[Unit] { context => + isFailed = context.isFailed() + } + context.markTaskFailed(new RuntimeException()) + context.markTaskCompleted(None) + assert(isFailed) + } } private object TaskContextSuite { diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 2779340e861..eb4c130cc6a 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -57,7 +57,9 @@ object MimaExcludes { ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.types.Decimal.fromStringANSI$default$3"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.types.Decimal.fromStringANSI"), // [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkEnv.this") + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkEnv.this"), + // [SPARK-46480][CORE][SQL] Fix NPE when table cache task attempt + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.TaskContext.isFailed") ) // Default exclude rules diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index af958208afd..c016fd52b61 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -285,9 +285,11 @@ case class CachedRDDBuilder( cachedPlan.conf) } val cached = cb.mapPartitionsInternal { it => - TaskContext.get().addTaskCompletionListener[Unit](_ => { - materializedPartitions.add(1L) - }) + TaskContext.get().addTaskCompletionListener[Unit] { context => + if (!context.isFailed() && !context.isInterrupted()) { + materializedPartitions.add(1L) + } + } new Iterator[CachedBatch] { override def hasNext: Boolean = it.hasNext override def next(): CachedBatch = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org