Github user zhonghaihua commented on a diff in the pull request: https://github.com/apache/spark/pull/12258#discussion_r65797773 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -343,17 +343,31 @@ private[spark] class TaskSchedulerImpl( } taskIdToTaskSetManager.get(tid) match { case Some(taskSet) => + var executorId: String = null if (TaskState.isFinished(state)) { taskIdToTaskSetManager.remove(tid) taskIdToExecutorId.remove(tid).foreach { execId => + executorId = execId if (executorIdToTaskCount.contains(execId)) { executorIdToTaskCount(execId) -= 1 } } } if (state == TaskState.FINISHED) { - taskSet.removeRunningTask(tid) - taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) + // In some case, executor has already removed by driver for heartbeats timeout, but + // at sometime, before executor killed by cluster, the task of running on this + // executor is finished and return task success state to driver. However, this kinds + // of task should be ignored, because the task on this executor is already re-queued + // by driver. For more details, can check in SPARK-14485. + if (executorId.ne(null) && !executorIdToTaskCount.contains(executorId)) { --- End diff -- Thanks for your comments. I will fix it soon.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org