Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15505#discussion_r95753220
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala ---
    @@ -52,7 +55,43 @@ private[spark] class TaskDescription(
         val addedFiles: Map[String, Long],
         val addedJars: Map[String, Long],
         val properties: Properties,
    -    val serializedTask: ByteBuffer) {
    +    private var serializedTask_ : ByteBuffer) extends  Logging {
    --- End diff --
    
    How about this?
    
    ``` scala
    private[spark] class TaskDescription(
        val taskId: Long,
        val attemptNumber: Int,
        val executorId: String,
        val name: String,
        val index: Int,    // Index within this task's TaskSet
        val addedFiles: Map[String, Long],
        val addedJars: Map[String, Long],
        val properties: Properties,
        private var serializedTask_ : ByteBuffer) extends  Logging {
    
      def this(
          taskId: Long,
          attemptNumber: Int,
          executorId: String,
          name: String,
          index: Int, // Index within this task's TaskSet
          addedFiles: Map[String, Long],
          addedJars: Map[String, Long],
          properties: Properties,
          task: Task[_]) {
          this(taskId, attemptNumber, executorId, name, index,
            addedFiles, addedJars, properties, null.asInstanceOf[ByteBuffer])
          task_ = task
      }
    
      private var task_ : Task[_] = null
    
      private  def serializedTask: ByteBuffer = {
        if (serializedTask_ == null) {
          // This is where we serialize the task on the driver before sending 
it to the executor.
          // This is not done when creating the TaskDescription so we can 
postpone this serialization
          // to later in the scheduling process -- particularly,
          // so it can happen in another thread by the 
CoarseGrainedSchedulerBackend.
          // On the executors, this will already be populated by decode
          serializedTask_ = try {
            ByteBuffer.wrap(Utils.serialize(task_))
          } catch {
            case NonFatal(e) =>
              val msg = s"Failed to serialize task $taskId, not attempting to 
retry it."
              logError(msg, e)
              throw new TaskNotSerializableException(e)
          }
        }
        serializedTask_
      }
    
      def getTask[_](loader: ClassLoader): Task[_] = {
        if (task_ == null) {
          task_ = Utils.deserialize(serializedTask, 
loader).asInstanceOf[Task[_]]
        }
        return task_
      }
    
      override def toString: String = "TaskDescription(TID=%d, 
index=%d)".format(taskId, index)
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to