Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15505#discussion_r96883996
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
    @@ -244,32 +245,45 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
         // Launch tasks returned by a set of resource offers
         private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
           for (task <- tasks.flatten) {
    -        val serializedTask = TaskDescription.encode(task)
    -        if (serializedTask.limit >= maxRpcMessageSize) {
    -          scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { 
taskSetMgr =>
    -            try {
    -              var msg = "Serialized task %s:%d was %d bytes, which exceeds 
max allowed: " +
    -                "spark.rpc.message.maxSize (%d bytes). Consider increasing 
" +
    -                "spark.rpc.message.maxSize or using broadcast variables 
for large values."
    -              msg = msg.format(task.taskId, task.index, 
serializedTask.limit, maxRpcMessageSize)
    -              taskSetMgr.abort(msg)
    -            } catch {
    -              case e: Exception => logError("Exception in error callback", 
e)
    -            }
    -          }
    +        val serializedTask = try {
    +          TaskDescription.encode(task)
    +        } catch {
    +          case NonFatal(e) =>
    +            abortTaskSetManager(scheduler, task.taskId,
    --- End diff --
    
    @witgo 
    I think master branch only has one small part of the problem I described.  
Though 
[`TaskSchedulerImpl.resourceOffer`](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L360)
 will loop through multiple locality levels even if there was a serialization 
error, but 
[`TaskSetManager.resourceOffer`](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L454)
 will mark the taskset as a zombie, so the later iterations will turn into 
no-ops.  But it doesn't will continue to launch tasks that were already 
successfully serialized.
    
    To put it another way, there are two different issues.  Say that the 
serialization error first occurs on the `n`th task scheduled in one call to 
`TaskSchedulerImpl.resourceOffer`
    (1) What happens with the 0..(n-1)th task?  Are they scheduled anyway?
    (2) What happens with the (n + 1) ... tasks?  Do we try to serialize them?
    
    Master branch has problem (1), but not problem (2).  This has both 
problems.  I'd expect that in almost all cases, n = 0.  We should make sure 
behavior is correct for any n, but I wouldn't stress about making it optimal.  
However, when n = 0, I wouldn't want the user to get a deluge of serialization 
exceptions.  They should just see one exception.
    
    So this means that you don't necessarily have to serialize *all* tasks 
first.  However, for each task, just before serialization, you should check 
whether or not the taskset is a zombie. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to