Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20244#discussion_r165759800
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
    @@ -1016,15 +1016,23 @@ class DAGScheduler(
         // might modify state of objects referenced in their closures. This is 
necessary in Hadoop
         // where the JobConf/Configuration object is not thread-safe.
         var taskBinary: Broadcast[Array[Byte]] = null
    +    var partitions: Array[Partition] = null
         try {
           // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
           // For ResultTask, serialize and broadcast (rdd, func).
    -      val taskBinaryBytes: Array[Byte] = stage match {
    -        case stage: ShuffleMapStage =>
    -          JavaUtils.bufferToArray(
    -            closureSerializer.serialize((stage.rdd, stage.shuffleDep): 
AnyRef))
    -        case stage: ResultStage =>
    -          JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, 
stage.func): AnyRef))
    +      var taskBinaryBytes: Array[Byte] = null
    +      // Add synchronized block to avoid rdd deserialized from 
taskBinaryBytes has diff checkpoint
    +      // status with the rdd when create ShuffleMapTask or ResultTask.
    --- End diff --
    
    I'd reword this a bit:
    
    taskBinaryBytes and partitions are both effected by the checkpoint status.  
We need this synchronization in case another concurrent job is checkpointing 
this RDD, so we get a consistent view of both variables.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to