This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c8e7eb1  [SPARK-26774][CORE] Update some docs on TaskSchedulerImpl.
c8e7eb1 is described below

commit c8e7eb1fa7b504ececfb36aa48860762dc747351
Author: Imran Rashid <iras...@cloudera.com>
AuthorDate: Thu Feb 28 11:30:20 2019 -0800

    [SPARK-26774][CORE] Update some docs on TaskSchedulerImpl.
    
    A couple of places in TaskSchedulerImpl could use a minor doc update on
    threading concerns.  There is one bug fix here, but only in
    sc.killTaskAttempt() which is probably not used much.
    
    Closes #23874 from squito/SPARK-26774.
    
    Authored-by: Imran Rashid <iras...@cloudera.com>
    Signed-off-by: Marcelo Vanzin <van...@cloudera.com>
---
 .../org/apache/spark/scheduler/TaskSchedulerImpl.scala | 18 ++++++++++++++----
 1 file changed, 14 insertions(+), 4 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index d551fb7..3f23bfe 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -51,7 +51,12 @@ import org.apache.spark.util.{AccumulatorV2, SystemClock, 
ThreadUtils, Utils}
  * threads, so it needs locks in public API methods to maintain its state. In 
addition, some
  * [[SchedulerBackend]]s synchronize on themselves when they want to send 
events here, and then
  * acquire a lock on us, so we need to make sure that we don't try to lock the 
backend while
- * we are holding a lock on ourselves.
+ * we are holding a lock on ourselves.  This class is called from many 
threads, notably:
+ *   * The DAGScheduler Event Loop
+ *   * The RPCHandler threads, responding to status updates from Executors
+ *   * Periodic revival of all offers from the CoarseGrainedSchedulerBackend, 
to accomodate delay
+ *      scheduling
+ *   * task-result-getter threads
  */
 private[spark] class TaskSchedulerImpl(
     val sc: SparkContext,
@@ -89,11 +94,12 @@ private[spark] class TaskSchedulerImpl(
   val CPUS_PER_TASK = conf.get(config.CPUS_PER_TASK)
 
   // TaskSetManagers are not thread safe, so any access to one should be 
synchronized
-  // on this class.
+  // on this class.  Protected by `this`
   private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, 
TaskSetManager]]
 
   // Protected by `this`
   private[scheduler] val taskIdToTaskSetManager = new ConcurrentHashMap[Long, 
TaskSetManager]
+  // Protected by `this`
   val taskIdToExecutorId = new HashMap[Long, String]
 
   @volatile private var hasReceivedTask = false
@@ -254,7 +260,10 @@ private[spark] class TaskSchedulerImpl(
     }
   }
 
-  override def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: 
String): Boolean = {
+  override def killTaskAttempt(
+      taskId: Long,
+      interruptThread: Boolean,
+      reason: String): Boolean = synchronized {
     logInfo(s"Killing task $taskId: $reason")
     val execId = taskIdToExecutorId.get(taskId)
     if (execId.isDefined) {
@@ -825,9 +834,10 @@ private[spark] class TaskSchedulerImpl(
 
   override def applicationAttemptId(): Option[String] = 
backend.applicationAttemptId()
 
+  // exposed for testing
   private[scheduler] def taskSetManagerForAttempt(
       stageId: Int,
-      stageAttemptId: Int): Option[TaskSetManager] = {
+      stageAttemptId: Int): Option[TaskSetManager] = synchronized {
     for {
       attempts <- taskSetsByStageIdAndAttempt.get(stageId)
       manager <- attempts.get(stageAttemptId)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to