[ https://issues.apache.org/jira/browse/SPARK-19790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16482512#comment-16482512 ]
Steve Loughran commented on SPARK-19790: ---------------------------------------- Update on this, having spent lots of time working in the internals of hadoop committers The v2 commit algorithm is broken in that if it fails *or partitions* during commit execution, the outcome is "undefined" The v1 committer doesn't have this problem on any FS with atomic rename; the task attempt dir rename() will either fail or succeed. A partitioned executor may still execute its rename after its successor, but as the outcome of any task attempt is required to be acceptable, that can be justified. The new s3a committers handle task failure. The only limitation then is the v2 commit algorithm, which, IMO, doesn't mean the semantics of a commit algorithm as required by Spark. Which means that if people are permitted to use that algorithm, then when it fails during a commit, it should be picked up and the job itself failed. Now: how to do that? There's currently no method in the hadoop OutputCommitter interface to probe this; there's only one for job commit being repeatable. We can add one to Hadoop 3.x+ HadoopOutputCommitter() to aid this (or have it implement {{StreamCapabilities}} and declare the string -> predicate map there). Spark can query it and fail on task execute failure if the task committer has the new method and declares that it can't handle commit retry. FileOutputCommitter would return true on the probe iff v1 was invoked; the S3A committers will return true always. Thoughts? > OutputCommitCoordinator should not allow another task to commit after an > ExecutorFailure > ---------------------------------------------------------------------------------------- > > Key: SPARK-19790 > URL: https://issues.apache.org/jira/browse/SPARK-19790 > Project: Spark > Issue Type: Bug > Components: Scheduler > Affects Versions: 2.1.0 > Reporter: Imran Rashid > Priority: Major > > The OutputCommitCoordinator resets the allowed committer when the task fails. > > https://github.com/apache/spark/blob/8aa560b75e6b083b2a890c52301414285ba35c3d/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala#L143 > However, if a task fails because of an ExecutorFailure, we actually have no > idea what the status is of the task. The task may actually still be running, > and perhaps successfully commit its output. By allowing another task to > commit its output, there is a chance that multiple tasks commit, which can > result in corrupt output. This would be particularly problematic when > commit() is an expensive operation, eg. moving files on S3. > For other task failures, we can allow other tasks to commit. But with an > ExecutorFailure, its not clear what the right thing to do is. The only safe > thing to do may be to fail the job. > This is related to SPARK-19631, and was discovered during discussion on that > PR https://github.com/apache/spark/pull/16959#discussion_r103549134 -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org