Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1452#discussion_r15142372
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala 
---
    @@ -17,134 +17,68 @@
     
     package org.apache.spark.scheduler
     
    -import scala.language.existentials
    +import java.nio.ByteBuffer
     
     import java.io._
    -import java.util.zip.{GZIPInputStream, GZIPOutputStream}
    -
    -import scala.collection.mutable.HashMap
     
     import org.apache.spark._
    -import org.apache.spark.rdd.{RDD, RDDCheckpointData}
    -
    -private[spark] object ResultTask {
    -
    -  // A simple map between the stage id to the serialized byte array of a 
task.
    -  // Served as a cache for task serialization because serialization can be
    -  // expensive on the master node if it needs to launch thousands of tasks.
    -  private val serializedInfoCache = new HashMap[Int, Array[Byte]]
    -
    -  def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, 
Iterator[_]) => _): Array[Byte] =
    -  {
    -    synchronized {
    -      val old = serializedInfoCache.get(stageId).orNull
    -      if (old != null) {
    -        old
    -      } else {
    -        val out = new ByteArrayOutputStream
    -        val ser = SparkEnv.get.closureSerializer.newInstance()
    -        val objOut = ser.serializeStream(new GZIPOutputStream(out))
    -        objOut.writeObject(rdd)
    -        objOut.writeObject(func)
    -        objOut.close()
    -        val bytes = out.toByteArray
    -        serializedInfoCache.put(stageId, bytes)
    -        bytes
    -      }
    -    }
    -  }
    -
    -  def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], 
(TaskContext, Iterator[_]) => _) =
    -  {
    -    val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
    -    val ser = SparkEnv.get.closureSerializer.newInstance()
    -    val objIn = ser.deserializeStream(in)
    -    val rdd = objIn.readObject().asInstanceOf[RDD[_]]
    -    val func = objIn.readObject().asInstanceOf[(TaskContext, Iterator[_]) 
=> _]
    -    (rdd, func)
    -  }
    -
    -  def removeStage(stageId: Int) {
    -    serializedInfoCache.remove(stageId)
    -  }
    -
    -  def clearCache() {
    -    synchronized {
    -      serializedInfoCache.clear()
    -    }
    -  }
    -}
    -
    +import org.apache.spark.broadcast.Broadcast
    +import org.apache.spark.rdd.RDD
     
     /**
      * A task that sends back the output to the driver application.
      *
    - * See [[org.apache.spark.scheduler.Task]] for more information.
    + * See [[Task]] for more information.
      *
      * @param stageId id of the stage this task belongs to
    - * @param rdd input to func
    + * @param rddBinary broadcast version of of the serialized RDD
      * @param func a function to apply on a partition of the RDD
    - * @param _partitionId index of the number in the RDD
    + * @param partition partition of the RDD this task is associated with
      * @param locs preferred task execution locations for locality scheduling
      * @param outputId index of the task in this job (a job can launch tasks 
on only a subset of the
      *                 input RDD's partitions).
      */
     private[spark] class ResultTask[T, U](
         stageId: Int,
    -    var rdd: RDD[T],
    -    var func: (TaskContext, Iterator[T]) => U,
    -    _partitionId: Int,
    +    val rddBinary: Broadcast[Array[Byte]],
    +    val func: (TaskContext, Iterator[T]) => U,
    +    val partition: Partition,
         @transient locs: Seq[TaskLocation],
    -    var outputId: Int)
    -  extends Task[U](stageId, _partitionId) with Externalizable {
    -
    -  def this() = this(0, null, null, 0, null, 0)
    -
    -  var split = if (rdd == null) null else rdd.partitions(partitionId)
    +    val outputId: Int)
    +  extends Task[U](stageId, partition.index) with Serializable {
    --- End diff --
    
    @mateiz and I looked and it seems so.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to