[ 
https://issues.apache.org/jira/browse/SPARK-19790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16482512#comment-16482512
 ] 

Steve Loughran commented on SPARK-19790:
----------------------------------------

Update on this, having spent lots of time working in the internals of hadoop 
committers

The v2 commit algorithm is broken in that if it fails *or partitions* during 
commit execution, the outcome is "undefined"
The v1 committer doesn't have this problem on any FS with atomic rename; the 
task attempt dir rename() will either fail or succeed. A partitioned executor 
may still execute its rename after its successor, but as the outcome of any 
task attempt is required to be acceptable, that can be justified.
The new s3a committers handle task failure.

The only limitation then is the v2 commit algorithm, which, IMO, doesn't mean 
the semantics of a commit algorithm as required by Spark. Which means that if 
people are permitted to use that algorithm, then when it fails during a commit, 
it should be picked up and the job itself failed.

Now: how to do that?

There's currently no method in the hadoop OutputCommitter interface to probe 
this; there's only one for job commit being repeatable. We can add one to 
Hadoop 3.x+ HadoopOutputCommitter() to aid this (or have it implement 
{{StreamCapabilities}} and declare the string -> predicate map there). Spark 
can query it and fail on task execute failure if the task committer has the new 
method and declares that it can't handle commit retry. FileOutputCommitter 
would return true on the probe iff v1 was invoked; the S3A committers will 
return true always.

Thoughts? 

> OutputCommitCoordinator should not allow another task to commit after an 
> ExecutorFailure
> ----------------------------------------------------------------------------------------
>
>                 Key: SPARK-19790
>                 URL: https://issues.apache.org/jira/browse/SPARK-19790
>             Project: Spark
>          Issue Type: Bug
>          Components: Scheduler
>    Affects Versions: 2.1.0
>            Reporter: Imran Rashid
>            Priority: Major
>
> The OutputCommitCoordinator resets the allowed committer when the task fails. 
>  
> https://github.com/apache/spark/blob/8aa560b75e6b083b2a890c52301414285ba35c3d/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala#L143
> However, if a task fails because of an ExecutorFailure, we actually have no 
> idea what the status is of the task.  The task may actually still be running, 
> and perhaps successfully commit its output.  By allowing another task to 
> commit its output, there is a chance that multiple tasks commit, which can 
> result in corrupt output.  This would be particularly problematic when 
> commit() is an expensive operation, eg. moving files on S3.
> For other task failures, we can allow other tasks to commit.  But with an 
> ExecutorFailure, its not clear what the right thing to do is.  The only safe 
> thing to do may be to fail the job.
> This is related to SPARK-19631, and was discovered during discussion on that 
> PR https://github.com/apache/spark/pull/16959#discussion_r103549134



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to