Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1452#discussion_r15137230
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala 
---
    @@ -17,134 +17,68 @@
     
     package org.apache.spark.scheduler
     
    -import scala.language.existentials
    +import java.nio.ByteBuffer
     
     import java.io._
    -import java.util.zip.{GZIPInputStream, GZIPOutputStream}
    -
    -import scala.collection.mutable.HashMap
     
     import org.apache.spark._
    -import org.apache.spark.rdd.{RDD, RDDCheckpointData}
    -
    -private[spark] object ResultTask {
    -
    -  // A simple map between the stage id to the serialized byte array of a 
task.
    -  // Served as a cache for task serialization because serialization can be
    -  // expensive on the master node if it needs to launch thousands of tasks.
    -  private val serializedInfoCache = new HashMap[Int, Array[Byte]]
    -
    -  def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, 
Iterator[_]) => _): Array[Byte] =
    -  {
    -    synchronized {
    -      val old = serializedInfoCache.get(stageId).orNull
    -      if (old != null) {
    -        old
    -      } else {
    -        val out = new ByteArrayOutputStream
    -        val ser = SparkEnv.get.closureSerializer.newInstance()
    -        val objOut = ser.serializeStream(new GZIPOutputStream(out))
    -        objOut.writeObject(rdd)
    -        objOut.writeObject(func)
    -        objOut.close()
    -        val bytes = out.toByteArray
    -        serializedInfoCache.put(stageId, bytes)
    -        bytes
    -      }
    -    }
    -  }
    -
    -  def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], 
(TaskContext, Iterator[_]) => _) =
    -  {
    -    val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
    -    val ser = SparkEnv.get.closureSerializer.newInstance()
    -    val objIn = ser.deserializeStream(in)
    -    val rdd = objIn.readObject().asInstanceOf[RDD[_]]
    -    val func = objIn.readObject().asInstanceOf[(TaskContext, Iterator[_]) 
=> _]
    -    (rdd, func)
    -  }
    -
    -  def removeStage(stageId: Int) {
    -    serializedInfoCache.remove(stageId)
    -  }
    -
    -  def clearCache() {
    -    synchronized {
    -      serializedInfoCache.clear()
    -    }
    -  }
    -}
    -
    +import org.apache.spark.broadcast.Broadcast
    +import org.apache.spark.rdd.RDD
     
     /**
      * A task that sends back the output to the driver application.
      *
    - * See [[org.apache.spark.scheduler.Task]] for more information.
    + * See [[Task]] for more information.
      *
      * @param stageId id of the stage this task belongs to
    - * @param rdd input to func
    + * @param rddBinary broadcast version of of the serialized RDD
    --- End diff --
    
    also past tense -- broadcasted


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to