Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15505#discussion_r93558527
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala ---
    @@ -17,27 +17,179 @@
     
     package org.apache.spark.scheduler
     
    +import java.io._
     import java.nio.ByteBuffer
    +import java.util.Properties
     
    -import org.apache.spark.util.SerializableBuffer
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +
    +import org.apache.spark.serializer.SerializerInstance
    +import org.apache.spark.util.{ByteBufferInputStream, 
ByteBufferOutputStream}
     
     /**
      * Description of a task that gets passed onto executors to be executed, 
usually created by
      * [[TaskSetManager.resourceOffer]].
      */
    -private[spark] class TaskDescription(
    +private[spark] class TaskDescription private(
         val taskId: Long,
         val attemptNumber: Int,
         val executorId: String,
         val name: String,
    -    val index: Int,    // Index within this task's TaskSet
    -    _serializedTask: ByteBuffer)
    -  extends Serializable {
    +    val index: Int,
    +    val taskFiles: mutable.Map[String, Long],
    +    val taskJars: mutable.Map[String, Long],
    +  private var task_ : Task[_],
    +  private var taskBytes: InputStream,
    +  private var taskProps: Properties) {
    +
    +  def this(
    +      taskId: Long,
    +      attemptNumber: Int,
    +      executorId: String,
    +      name: String,
    +      index: Int,
    +      taskFiles: mutable.Map[String, Long],
    +      taskJars: mutable.Map[String, Long],
    +      task: Task[_]) {
    +    this(taskId, attemptNumber, executorId, name, index, taskFiles, 
taskJars, task,
    +      null.asInstanceOf[InputStream],
    +      null.asInstanceOf[Properties])
    +  }
    +
    +  @throws[IOException]
    +  def encode(serializer: SerializerInstance): ByteBuffer = {
    +    val out = new ByteBufferOutputStream(4096)
    +    encode(out, serializer)
    +    out.close()
    +    out.toByteBuffer
    +  }
    +
    +  @throws[IOException]
    +  def encode(outputStream: OutputStream, serializer: SerializerInstance): 
Unit = {
    +    val out = new DataOutputStream(outputStream)
    +    // Write taskId
    +    out.writeLong(taskId)
    +
    +    // Write attemptNumber
    +    out.writeInt(attemptNumber)
    +
    +    // Write executorId
    +    out.writeUTF(executorId)
    +
    +    // Write name
    +    out.writeUTF(name)
     
    -  // Because ByteBuffers are not serializable, wrap the task in a 
SerializableBuffer
    -  private val buffer = new SerializableBuffer(_serializedTask)
    +    // Write index
    +    out.writeInt(index)
     
    -  def serializedTask: ByteBuffer = buffer.value
    +    // Write taskFiles
    +    out.writeInt(taskFiles.size)
    +    for ((name, timestamp) <- taskFiles) {
    +      out.writeUTF(name)
    +      out.writeLong(timestamp)
    +    }
    +
    +    // Write taskJars
    +    out.writeInt(taskJars.size)
    +    for ((name, timestamp) <- taskJars) {
    +      out.writeUTF(name)
    +      out.writeLong(timestamp)
    +    }
    +
    +    // Write the task properties separately so it is available before full 
task deserialization.
    +    val taskProps = task_.localProperties
    +    val propertyNames = taskProps.stringPropertyNames
    +    out.writeInt(propertyNames.size())
    +    propertyNames.asScala.foreach { key =>
    +      val value = taskProps.getProperty(key)
    +      out.writeUTF(key)
    +      out.writeUTF(value)
    +    }
    +
    +    // Write the task itself and finish
    +    val serializeStream = serializer.serializeStream(out)
    +    serializeStream.writeValue(task_)
    +    serializeStream.flush()
    +  }
    +
    +  def task(ser: SerializerInstance): Task[_] = {
    +    if (task_ == null) {
    +      val deserializeStream = ser.deserializeStream(taskBytes)
    +      task_ = deserializeStream.readValue[Task[_]]()
    +      task_.localProperties = taskProps
    +      deserializeStream.close()
    +      taskProps = null
    +      taskBytes = null
    +    }
    +    task_
    --- End diff --
    
    The intent of this code modification is to move the deserialization 
operation to the TaskDescription class. so that taskBytes can be set to private.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to