This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new b8750d5c0b41 [SPARK-46182][CORE] Track `lastTaskFinishTime` using the 
exact task finished event
b8750d5c0b41 is described below

commit b8750d5c0b416137ce802cf73dd92b0fc7ff5467
Author: Xingbo Jiang <xingbo.ji...@databricks.com>
AuthorDate: Sun Dec 3 22:08:20 2023 -0800

    [SPARK-46182][CORE] Track `lastTaskFinishTime` using the exact task 
finished event
    
    ### What changes were proposed in this pull request?
    
    We found a race condition between lastTaskRunningTime and 
lastShuffleMigrationTime that could lead to a decommissioned executor exit 
before all the shuffle blocks have been discovered. The issue could lead to 
immediate task retry right after an executor exit, thus longer query execution 
time.
    
    To fix the issue, we choose to update the lastTaskRunningTime only when a 
task updates its status to finished through the StatusUpdate event. This is 
better than the current approach (which use a thread to check for number of 
running tasks every second), because in this way we clearly know whether the 
shuffle block refresh happened after all tasks finished running or not, thus 
resolved the race condition mentioned above.
    
    ### Why are the changes needed?
    
    To fix a race condition that could lead to shuffle data lost, thus longer 
query execution time.
    
    ### How was this patch tested?
    
    This is a very subtle race condition that is hard to write a unit test 
using current unit test framework. And we are confident the change is low risk. 
Thus only verify by passing all the existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #44090 from jiangxb1987/SPARK-46182.
    
    Authored-by: Xingbo Jiang <xingbo.ji...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
    (cherry picked from commit 6f112f7b1a50a2b8a59952c69f67dd5f80ab6633)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../spark/executor/CoarseGrainedExecutorBackend.scala    | 16 +++++++---------
 1 file changed, 7 insertions(+), 9 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index c695a9ec2851..537522326fc7 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -21,7 +21,7 @@ import java.net.URL
 import java.nio.ByteBuffer
 import java.util.Locale
 import java.util.concurrent.ConcurrentHashMap
-import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
 
 import scala.util.{Failure, Success}
 import scala.util.control.NonFatal
@@ -80,6 +80,10 @@ private[spark] class CoarseGrainedExecutorBackend(
 
   private var decommissioned = false
 
+  // Track the last time in ns that at least one task is running. If no task 
is running and all
+  // shuffle/RDD data migration are done, the decommissioned executor should 
exit.
+  private var lastTaskFinishTime = new AtomicLong(System.nanoTime())
+
   override def onStart(): Unit = {
     if (env.conf.get(DECOMMISSION_ENABLED)) {
       val signal = env.conf.get(EXECUTOR_DECOMMISSION_SIGNAL)
@@ -269,6 +273,7 @@ private[spark] class CoarseGrainedExecutorBackend(
     val msg = StatusUpdate(executorId, taskId, state, data, cpus, resources)
     if (TaskState.isFinished(state)) {
       taskResources.remove(taskId)
+      lastTaskFinishTime.set(System.nanoTime())
     }
     driver match {
       case Some(driverRef) => driverRef.send(msg)
@@ -341,7 +346,6 @@ private[spark] class CoarseGrainedExecutorBackend(
 
       val shutdownThread = new Thread("wait-for-blocks-to-migrate") {
         override def run(): Unit = {
-          var lastTaskRunningTime = System.nanoTime()
           val sleep_time = 1000 // 1s
           // This config is internal and only used by unit tests to force an 
executor
           // to hang around for longer when decommissioned.
@@ -358,7 +362,7 @@ private[spark] class CoarseGrainedExecutorBackend(
                 val (migrationTime, allBlocksMigrated) = 
env.blockManager.lastMigrationInfo()
                 // We can only trust allBlocksMigrated boolean value if there 
were no tasks running
                 // since the start of computing it.
-                if (allBlocksMigrated && (migrationTime > 
lastTaskRunningTime)) {
+                if (allBlocksMigrated && (migrationTime > 
lastTaskFinishTime.get())) {
                   logInfo("No running tasks, all blocks migrated, stopping.")
                   exitExecutor(0, ExecutorLossMessage.decommissionFinished, 
notifyDriver = true)
                 } else {
@@ -370,12 +374,6 @@ private[spark] class CoarseGrainedExecutorBackend(
               }
             } else {
               logInfo(s"Blocked from shutdown by ${executor.numRunningTasks} 
running tasks")
-              // If there is a running task it could store blocks, so make 
sure we wait for a
-              // migration loop to complete after the last task is done.
-              // Note: this is only advanced if there is a running task, if 
there
-              // is no running task but the blocks are not done migrating this 
does not
-              // move forward.
-              lastTaskRunningTime = System.nanoTime()
             }
             Thread.sleep(sleep_time)
           }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to