This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 43f79326106 [SPARK-46480][CORE][SQL] Fix NPE when table cache task 
attempt
43f79326106 is described below

commit 43f79326106acb277b9edfb28c34f5dc310b416b
Author: ulysses-you <ulyssesyo...@gmail.com>
AuthorDate: Fri Dec 22 13:26:02 2023 +0800

    [SPARK-46480][CORE][SQL] Fix NPE when table cache task attempt
    
    ### What changes were proposed in this pull request?
    
    This pr adds a check: we only mark the cached partition is materialized if 
the task is not failed and not interrupted. And adds a new method `isFailed` in 
`TaskContext`.
    
    ### Why are the changes needed?
    
    Before this pr, when do cache, task failure can cause NPE in other tasks
    
    ```
    java.lang.NullPointerException
            at java.nio.ByteBuffer.wrap(ByteBuffer.java:396)
            at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.accessors1$(Unknown
 Source)
            at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown
 Source)
            at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
            at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:155)
            at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
            at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
            at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
            at org.apache.spark.scheduler.Task.run(Task.scala:131)
            at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes, it's a bug fix
    
    ### How was this patch tested?
    
    add test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    no
    
    Closes #44445 from ulysses-you/fix-cache.
    
    Authored-by: ulysses-you <ulyssesyo...@gmail.com>
    Signed-off-by: Kent Yao <y...@apache.org>
---
 core/src/main/scala/org/apache/spark/BarrierTaskContext.scala  |  2 ++
 core/src/main/scala/org/apache/spark/TaskContext.scala         |  5 +++++
 core/src/main/scala/org/apache/spark/TaskContextImpl.scala     |  2 ++
 .../scala/org/apache/spark/scheduler/TaskContextSuite.scala    | 10 ++++++++++
 project/MimaExcludes.scala                                     |  4 +++-
 .../apache/spark/sql/execution/columnar/InMemoryRelation.scala |  8 +++++---
 6 files changed, 27 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala 
b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
index 0f9abaf94ae..50aff8b0fb1 100644
--- a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
@@ -194,6 +194,8 @@ class BarrierTaskContext private[spark] (
 
   override def isCompleted(): Boolean = taskContext.isCompleted()
 
+  override def isFailed(): Boolean = taskContext.isFailed()
+
   override def isInterrupted(): Boolean = taskContext.isInterrupted()
 
   override def addTaskCompletionListener(listener: TaskCompletionListener): 
this.type = {
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala 
b/core/src/main/scala/org/apache/spark/TaskContext.scala
index 0f8a10d734b..15ddd08fb4a 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -94,6 +94,11 @@ abstract class TaskContext extends Serializable {
    */
   def isCompleted(): Boolean
 
+  /**
+   * Returns true if the task has failed.
+   */
+  def isFailed(): Boolean
+
   /**
    * Returns true if the task has been killed.
    */
diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala 
b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
index 8d2c2ab9bc4..a3c36de1515 100644
--- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
@@ -275,6 +275,8 @@ private[spark] class TaskContextImpl(
   @GuardedBy("this")
   override def isCompleted(): Boolean = synchronized(completed)
 
+  override def isFailed(): Boolean = synchronized(failureCauseOpt.isDefined)
+
   override def isInterrupted(): Boolean = reasonIfKilled.isDefined
 
   override def getLocalProperty(key: String): String = 
localProperties.getProperty(key)
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
index c56fd3fd1f5..9aba41cea21 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
@@ -670,6 +670,16 @@ class TaskContextSuite extends SparkFunSuite with 
BeforeAndAfter with LocalSpark
     assert(invocationOrder === Seq("C", "B", "A", "D"))
   }
 
+  test("SPARK-46480: Add isFailed in TaskContext") {
+    val context = TaskContext.empty()
+    var isFailed = false
+    context.addTaskCompletionListener[Unit] { context =>
+      isFailed = context.isFailed()
+    }
+    context.markTaskFailed(new RuntimeException())
+    context.markTaskCompleted(None)
+    assert(isFailed)
+  }
 }
 
 private object TaskContextSuite {
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 2779340e861..eb4c130cc6a 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -57,7 +57,9 @@ object MimaExcludes {
     
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.types.Decimal.fromStringANSI$default$3"),
     
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.types.Decimal.fromStringANSI"),
     // [SPARK-45762][CORE] Support shuffle managers defined in user jars by 
changing startup order
-    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkEnv.this")
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkEnv.this"),
+    // [SPARK-46480][CORE][SQL] Fix NPE when table cache task attempt
+    
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.TaskContext.isFailed")
   )
 
   // Default exclude rules
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index af958208afd..c016fd52b61 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -285,9 +285,11 @@ case class CachedRDDBuilder(
         cachedPlan.conf)
     }
     val cached = cb.mapPartitionsInternal { it =>
-      TaskContext.get().addTaskCompletionListener[Unit](_ => {
-        materializedPartitions.add(1L)
-      })
+      TaskContext.get().addTaskCompletionListener[Unit] { context =>
+        if (!context.isFailed() && !context.isInterrupted()) {
+          materializedPartitions.add(1L)
+        }
+      }
       new Iterator[CachedBatch] {
         override def hasNext: Boolean = it.hasNext
         override def next(): CachedBatch = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to