Repository: spark
Updated Branches:
  refs/heads/master 161d0b4a4 -> 3b6ef2c53


[SPARK-7655][Core] Deserializing value should not hold the TaskSchedulerImpl 
lock

We should not call `DirectTaskResult.value` when holding the 
`TaskSchedulerImpl` lock. It may cost dozens of seconds to deserialize a large 
object.

Author: zsxwing <zsxw...@gmail.com>

Closes #6195 from zsxwing/SPARK-7655 and squashes the following commits:

21f502e [zsxwing] Add more comments
e25fa88 [zsxwing] Add comments
15010b5 [zsxwing] Deserialize value should not hold the TaskSchedulerImpl lock


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3b6ef2c5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3b6ef2c5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3b6ef2c5

Branch: refs/heads/master
Commit: 3b6ef2c5391b528ef989e24400fbb0c496c3b245
Parents: 161d0b4
Author: zsxwing <zsxw...@gmail.com>
Authored: Sat May 16 21:03:22 2015 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Sat May 16 21:03:22 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/scheduler/TaskResult.scala | 23 ++++++++++++++++++--
 .../spark/scheduler/TaskResultGetter.scala      |  4 ++++
 .../apache/spark/scheduler/TaskSetManager.scala |  6 +++++
 3 files changed, 31 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3b6ef2c5/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
index 1f114a0..8b2a742 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
@@ -40,6 +40,9 @@ class DirectTaskResult[T](var valueBytes: ByteBuffer, var 
accumUpdates: Map[Long
     var metrics: TaskMetrics)
   extends TaskResult[T] with Externalizable {
 
+  private var valueObjectDeserialized = false
+  private var valueObject: T = _
+
   def this() = this(null.asInstanceOf[ByteBuffer], null, null)
 
   override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException 
{
@@ -72,10 +75,26 @@ class DirectTaskResult[T](var valueBytes: ByteBuffer, var 
accumUpdates: Map[Long
       }
     }
     metrics = in.readObject().asInstanceOf[TaskMetrics]
+    valueObjectDeserialized = false
   }
 
+  /**
+   * When `value()` is called at the first time, it needs to deserialize 
`valueObject` from
+   * `valueBytes`. It may cost dozens of seconds for a large instance. So when 
calling `value` at
+   * the first time, the caller should avoid to block other threads.
+   *
+   * After the first time, `value()` is trivial and just returns the 
deserialized `valueObject`.
+   */
   def value(): T = {
-    val resultSer = SparkEnv.get.serializer.newInstance()
-    resultSer.deserialize(valueBytes)
+    if (valueObjectDeserialized) {
+      valueObject
+    } else {
+      // This should not run when holding a lock because it may cost dozens of 
seconds for a large
+      // value.
+      val resultSer = SparkEnv.get.serializer.newInstance()
+      valueObject = resultSer.deserialize(valueBytes)
+      valueObjectDeserialized = true
+      valueObject
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3b6ef2c5/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index 391827c..46a6f65 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -54,6 +54,10 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, 
scheduler: TaskSchedul
               if (!taskSetManager.canFetchMoreResults(serializedData.limit())) 
{
                 return
               }
+              // deserialize "value" without holding any lock so that it won't 
block other threads.
+              // We should call it here, so that when it's called again in
+              // "TaskSetManager.handleSuccessfulTask", it does not need to 
deserialize the value.
+              directResult.value()
               (directResult, serializedData.limit())
             case IndirectTaskResult(blockId, size) =>
               if (!taskSetManager.canFetchMoreResults(size)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/3b6ef2c5/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 7dc3252..c4487d5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -620,6 +620,12 @@ private[spark] class TaskSetManager(
     val index = info.index
     info.markSuccessful()
     removeRunningTask(tid)
+    // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which 
holds the
+    // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, 
we should not
+    // "deserialize" the value when holding a lock to avoid blocking other 
threads. So we call
+    // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before 
reaching here.
+    // Note: "result.value()" only deserializes the value when it's called at 
the first time, so
+    // here "result.value()" just returns the value and won't block other 
threads.
     sched.dagScheduler.taskEnded(
       tasks(index), Success, result.value(), result.accumUpdates, info, 
result.metrics)
     if (!successful(index)) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to