Repository: spark
Updated Branches:
  refs/heads/branch-2.1 1df8020e1 -> 24fe6eb0f


[SPARK-23053][CORE][BRANCH-2.1] taskBinarySerialization and task partitions 
calculate in DagScheduler.submitMissingTasks should keep the same RDD 
checkpoint status

## What changes were proposed in this pull request?
This PR backports [#20244](https://github.com/apache/spark/pull/20244)

When we run concurrent jobs using the same rdd which is marked to do 
checkpoint. If one job has finished running the job, and start the process of 
RDD.doCheckpoint, while another job is submitted, then submitStage and 
submitMissingTasks will be called. In 
[submitMissingTasks](https://github.com/apache/spark/blob/branch-2.1/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L932),
 will serialize taskBinaryBytes and calculate task partitions which are both 
affected by the status of checkpoint, if the former is calculated before 
doCheckpoint finished, while the latter is calculated after doCheckpoint 
finished, when run task, rdd.compute will be called, for some rdds with 
particular partition type such as 
[UnionRDD](https://github.com/apache/spark/blob/branch-2.1/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala)
 who will do partition type cast, will get a ClassCastException because the 
part params is actually a CheckpointRDDPartition.
This error occurs  because rdd.doCheckpoint occurs in the same thread that 
called sc.runJob, while the task serialization occurs in the DAGSchedulers 
event loop.

## How was this patch tested?
the exist tests.

Author: huangtengfei <huangtengfei@huangtengfeideMacBook-Pro.local>

Closes #20635 from ivoson/branch-2.1-23053.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/24fe6eb0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/24fe6eb0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/24fe6eb0

Branch: refs/heads/branch-2.1
Commit: 24fe6eb0f6e2fe31d1c93fc65bc294ebb94e2dcf
Parents: 1df8020
Author: huangtengfei <huangtengfei@huangtengfeideMacBook-Pro.local>
Authored: Tue Feb 20 21:01:45 2018 -0600
Committer: Imran Rashid <iras...@cloudera.com>
Committed: Tue Feb 20 21:01:45 2018 -0600

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 27 +++++++++++++-------
 1 file changed, 18 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/24fe6eb0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 01a95c0..9d46d69 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -38,7 +38,7 @@ import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.internal.Logging
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.partial.{ApproximateActionListener, 
ApproximateEvaluator, PartialResult}
-import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.{RDD, RDDCheckpointData}
 import org.apache.spark.rpc.RpcTimeout
 import org.apache.spark.storage._
 import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
@@ -982,15 +982,24 @@ class DAGScheduler(
     // might modify state of objects referenced in their closures. This is 
necessary in Hadoop
     // where the JobConf/Configuration object is not thread-safe.
     var taskBinary: Broadcast[Array[Byte]] = null
+    var partitions: Array[Partition] = null
     try {
       // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
       // For ResultTask, serialize and broadcast (rdd, func).
-      val taskBinaryBytes: Array[Byte] = stage match {
-        case stage: ShuffleMapStage =>
-          JavaUtils.bufferToArray(
-            closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
-        case stage: ResultStage =>
-          JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, 
stage.func): AnyRef))
+      var taskBinaryBytes: Array[Byte] = null
+      // taskBinaryBytes and partitions are both effected by the checkpoint 
status. We need
+      // this synchronization in case another concurrent job is checkpointing 
this RDD, so we get a
+      // consistent view of both variables.
+      RDDCheckpointData.synchronized {
+        taskBinaryBytes = stage match {
+          case stage: ShuffleMapStage =>
+            JavaUtils.bufferToArray(
+              closureSerializer.serialize((stage.rdd, stage.shuffleDep): 
AnyRef))
+          case stage: ResultStage =>
+            JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, 
stage.func): AnyRef))
+        }
+
+        partitions = stage.rdd.partitions
       }
 
       taskBinary = sc.broadcast(taskBinaryBytes)
@@ -1013,7 +1022,7 @@ class DAGScheduler(
         case stage: ShuffleMapStage =>
           partitionsToCompute.map { id =>
             val locs = taskIdToLocations(id)
-            val part = stage.rdd.partitions(id)
+            val part = partitions(id)
             new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
               taskBinary, part, locs, stage.latestInfo.taskMetrics, 
properties, Option(jobId),
               Option(sc.applicationId), sc.applicationAttemptId)
@@ -1022,7 +1031,7 @@ class DAGScheduler(
         case stage: ResultStage =>
           partitionsToCompute.map { id =>
             val p: Int = stage.partitions(id)
-            val part = stage.rdd.partitions(p)
+            val part = partitions(p)
             val locs = taskIdToLocations(id)
             new ResultTask(stage.id, stage.latestInfo.attemptId,
               taskBinary, part, locs, id, properties, 
stage.latestInfo.taskMetrics,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to