Repository: spark
Updated Branches:
  refs/heads/branch-2.2 14b5dbfa9 -> 73263b215


[SPARK-23053][CORE] taskBinarySerialization and task partitions calculate in 
DagScheduler.submitMissingTasks should keep the same RDD checkpoint status

## What changes were proposed in this pull request?

When we run concurrent jobs using the same rdd which is marked to do 
checkpoint. If one job has finished running the job, and start the process of 
RDD.doCheckpoint, while another job is submitted, then submitStage and 
submitMissingTasks will be called. In 
[submitMissingTasks](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L961),
 will serialize taskBinaryBytes and calculate task partitions which are both 
affected by the status of checkpoint, if the former is calculated before 
doCheckpoint finished, while the latter is calculated after doCheckpoint 
finished, when run task, rdd.compute will be called, for some rdds with 
particular partition type such as 
[UnionRDD](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala)
 who will do partition type cast, will get a ClassCastException because the 
part params is actually a CheckpointRDDPartition.
This error occurs  because rdd.doCheckpoint occurs in the same thread that 
called sc.runJob, while the task serialization occurs in the DAGSchedulers 
event loop.

## How was this patch tested?

the exist uts and also add a test case in DAGScheduerSuite to show the 
exception case.

Author: huangtengfei <huangtengfei@huangtengfeideMacBook-Pro.local>

Closes #20244 from ivoson/branch-taskpart-mistype.

(cherry picked from commit 091a000d27f324de8c5c527880854ecfcf5de9a4)
Signed-off-by: Imran Rashid <iras...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/73263b21
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/73263b21
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/73263b21

Branch: refs/heads/branch-2.2
Commit: 73263b2150ededd8000b3fe4f276625d90de2507
Parents: 14b5dbf
Author: huangtengfei <huangtengfei@huangtengfeideMacBook-Pro.local>
Authored: Tue Feb 13 09:59:21 2018 -0600
Committer: Imran Rashid <iras...@cloudera.com>
Committed: Tue Feb 13 10:00:23 2018 -0600

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 27 +++++++++++++-------
 1 file changed, 18 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/73263b21/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b009424..87e407f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -38,7 +38,7 @@ import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.internal.Logging
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.partial.{ApproximateActionListener, 
ApproximateEvaluator, PartialResult}
-import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.{RDD, RDDCheckpointData}
 import org.apache.spark.rpc.RpcTimeout
 import org.apache.spark.storage._
 import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
@@ -992,15 +992,24 @@ class DAGScheduler(
     // might modify state of objects referenced in their closures. This is 
necessary in Hadoop
     // where the JobConf/Configuration object is not thread-safe.
     var taskBinary: Broadcast[Array[Byte]] = null
+    var partitions: Array[Partition] = null
     try {
       // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
       // For ResultTask, serialize and broadcast (rdd, func).
-      val taskBinaryBytes: Array[Byte] = stage match {
-        case stage: ShuffleMapStage =>
-          JavaUtils.bufferToArray(
-            closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
-        case stage: ResultStage =>
-          JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, 
stage.func): AnyRef))
+      var taskBinaryBytes: Array[Byte] = null
+      // taskBinaryBytes and partitions are both effected by the checkpoint 
status. We need
+      // this synchronization in case another concurrent job is checkpointing 
this RDD, so we get a
+      // consistent view of both variables.
+      RDDCheckpointData.synchronized {
+        taskBinaryBytes = stage match {
+          case stage: ShuffleMapStage =>
+            JavaUtils.bufferToArray(
+              closureSerializer.serialize((stage.rdd, stage.shuffleDep): 
AnyRef))
+          case stage: ResultStage =>
+            JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, 
stage.func): AnyRef))
+        }
+
+        partitions = stage.rdd.partitions
       }
 
       taskBinary = sc.broadcast(taskBinaryBytes)
@@ -1025,7 +1034,7 @@ class DAGScheduler(
           stage.pendingPartitions.clear()
           partitionsToCompute.map { id =>
             val locs = taskIdToLocations(id)
-            val part = stage.rdd.partitions(id)
+            val part = partitions(id)
             stage.pendingPartitions += id
             new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
               taskBinary, part, locs, properties, serializedTaskMetrics, 
Option(jobId),
@@ -1035,7 +1044,7 @@ class DAGScheduler(
         case stage: ResultStage =>
           partitionsToCompute.map { id =>
             val p: Int = stage.partitions(id)
-            val part = stage.rdd.partitions(p)
+            val part = partitions(p)
             val locs = taskIdToLocations(id)
             new ResultTask(stage.id, stage.latestInfo.attemptId,
               taskBinary, part, locs, id, properties, serializedTaskMetrics,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to