[jira] [Commented] (SPARK-19790) OutputCommitCoordinator should not allow another task to commit after an ExecutorFailure
[ https://issues.apache.org/jira/browse/SPARK-19790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16482512#comment-16482512 ] Steve Loughran commented on SPARK-19790: Update on this, having spent lots of time working in the internals of hadoop committers The v2 commit algorithm is broken in that if it fails *or partitions* during commit execution, the outcome is "undefined" The v1 committer doesn't have this problem on any FS with atomic rename; the task attempt dir rename() will either fail or succeed. A partitioned executor may still execute its rename after its successor, but as the outcome of any task attempt is required to be acceptable, that can be justified. The new s3a committers handle task failure. The only limitation then is the v2 commit algorithm, which, IMO, doesn't mean the semantics of a commit algorithm as required by Spark. Which means that if people are permitted to use that algorithm, then when it fails during a commit, it should be picked up and the job itself failed. Now: how to do that? There's currently no method in the hadoop OutputCommitter interface to probe this; there's only one for job commit being repeatable. We can add one to Hadoop 3.x+ HadoopOutputCommitter() to aid this (or have it implement {{StreamCapabilities}} and declare the string -> predicate map there). Spark can query it and fail on task execute failure if the task committer has the new method and declares that it can't handle commit retry. FileOutputCommitter would return true on the probe iff v1 was invoked; the S3A committers will return true always. Thoughts? > OutputCommitCoordinator should not allow another task to commit after an > ExecutorFailure > > > Key: SPARK-19790 > URL: https://issues.apache.org/jira/browse/SPARK-19790 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Imran Rashid >Priority: Major > > The OutputCommitCoordinator resets the allowed committer when the task fails. > > https://github.com/apache/spark/blob/8aa560b75e6b083b2a890c52301414285ba35c3d/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala#L143 > However, if a task fails because of an ExecutorFailure, we actually have no > idea what the status is of the task. The task may actually still be running, > and perhaps successfully commit its output. By allowing another task to > commit its output, there is a chance that multiple tasks commit, which can > result in corrupt output. This would be particularly problematic when > commit() is an expensive operation, eg. moving files on S3. > For other task failures, we can allow other tasks to commit. But with an > ExecutorFailure, its not clear what the right thing to do is. The only safe > thing to do may be to fail the job. > This is related to SPARK-19631, and was discovered during discussion on that > PR https://github.com/apache/spark/pull/16959#discussion_r103549134 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19790) OutputCommitCoordinator should not allow another task to commit after an ExecutorFailure
[ https://issues.apache.org/jira/browse/SPARK-19790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16088630#comment-16088630 ] Steve Loughran commented on SPARK-19790: I've now summarised the FileOutputCommitter v1 and v2 algorithms as far as I can understand from the source and some step throughts; I think I'd need to add a few more tests to be really sure I understand it: [https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3a_committer_architecture.md] h2. v1 # task commits by atomic rename() of {{$dest/_temporary/$appAttemptId/_temporary/$taskAttemptID}} to the completed dir {{$dest/__temporary/$jobAttempt/$taskAttempt}} ; job commit moves all data under each task attempt into $dest. Note that as only one rename() to that dest will work, race conditions in speculative work are prevented without the need for expliicit co-ord # failure during task commit: delete the completed task directory (if any), rerun task. # job commit: list completed tasks, move/merge them into the dest dir. Not atomic, rename operation count is per task & dir tree,. A failure in job commit is not recoverable. # failure during job commit: job in unknown state, rm $dest and rerun # job restart: after failure of entire job, it can be restarted, with restarted job using the completed tasks. Provided rename() is atomic there's a guarantee that every task's completed dir is valid; provided it's O(1) its an inexpensvie operation h3. v2 # task commit: copy straight to dest (this is where co-ordination is needed between tasks and job manager) # job commit: no-op # job restart: none, start again # failure during task commit: dest dir in unknown state, job restart needed # failure during job commit: job in unknown state, rm $dest and rerun Given that spark doesn't do job restart, switching to v2 everywhere reduces #of renames, but makes recovery from failure during task commit impossible Neither algorithm is correct on an inconsistent S3 endpoint, as they both can get incorrect listings of files to COPY + DELETE. With consistency, you still have O(data) task commits on both algorithms, and another O(data) job commit with v1. Azure WASB is consistent, and uses leases for exclusive access to bits of the store on comnmit, but even it can do with a store-specific committer. Rename is not the solution to committing data in an object store. > OutputCommitCoordinator should not allow another task to commit after an > ExecutorFailure > > > Key: SPARK-19790 > URL: https://issues.apache.org/jira/browse/SPARK-19790 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Imran Rashid > > The OutputCommitCoordinator resets the allowed committer when the task fails. > > https://github.com/apache/spark/blob/8aa560b75e6b083b2a890c52301414285ba35c3d/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala#L143 > However, if a task fails because of an ExecutorFailure, we actually have no > idea what the status is of the task. The task may actually still be running, > and perhaps successfully commit its output. By allowing another task to > commit its output, there is a chance that multiple tasks commit, which can > result in corrupt output. This would be particularly problematic when > commit() is an expensive operation, eg. moving files on S3. > For other task failures, we can allow other tasks to commit. But with an > ExecutorFailure, its not clear what the right thing to do is. The only safe > thing to do may be to fail the job. > This is related to SPARK-19631, and was discovered during discussion on that > PR https://github.com/apache/spark/pull/16959#discussion_r103549134 -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19790) OutputCommitCoordinator should not allow another task to commit after an ExecutorFailure
[ https://issues.apache.org/jira/browse/SPARK-19790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898003#comment-15898003 ] Steve Loughran commented on SPARK-19790: Thinking some more & looking at code snippets # FileOutputFormat with algorithm 2 can recover from a failed task commit, somehow. That code is too complex to make sense of now that it mixes two algorithms with corecursion and stuff to run in client and server. # The s3guard committer will have the task upload it's files in parallel, but will not complete the multipart commit; the information to do this will be persisted to HDFS for execution by the job committer. # I plan a little spark extension which will do the same for files with absolute destinations, this time passing the data back to the job committer. Which means a failed task can be recovered from. All pending writes for that task will need to be found (scan FS) and abort. Still an issue about ordering of PUT vs save of upload data; some GC of pending commits to a dest dir would be the way to avoid running up bills. This is one of those coordination problems where someone with TLA+ algorithm specification skills would be good, along with the foundation specs for filesystems and object stores. Someone needs to find a CS student looking for a project. > OutputCommitCoordinator should not allow another task to commit after an > ExecutorFailure > > > Key: SPARK-19790 > URL: https://issues.apache.org/jira/browse/SPARK-19790 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Imran Rashid > > The OutputCommitCoordinator resets the allowed committer when the task fails. > > https://github.com/apache/spark/blob/8aa560b75e6b083b2a890c52301414285ba35c3d/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala#L143 > However, if a task fails because of an ExecutorFailure, we actually have no > idea what the status is of the task. The task may actually still be running, > and perhaps successfully commit its output. By allowing another task to > commit its output, there is a chance that multiple tasks commit, which can > result in corrupt output. This would be particularly problematic when > commit() is an expensive operation, eg. moving files on S3. > For other task failures, we can allow other tasks to commit. But with an > ExecutorFailure, its not clear what the right thing to do is. The only safe > thing to do may be to fail the job. > This is related to SPARK-19631, and was discovered during discussion on that > PR https://github.com/apache/spark/pull/16959#discussion_r103549134 -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19790) OutputCommitCoordinator should not allow another task to commit after an ExecutorFailure
[ https://issues.apache.org/jira/browse/SPARK-19790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897863#comment-15897863 ] Steve Loughran commented on SPARK-19790: The only time a task output committer should be making observable state changes is during the actual commit operation. If it is doing things before that commit operation, that's a bug in that it doesn't meet the goal "committer". The Hadoop output committer has two stages here: the FileOutputFormat work and then rename of files; together they are not a transaction, but on a real filesystem: fast The now deleted DirectOutputCommitter was doing things as it went along —but that's why it got pulled. That leaves: the Hadoop Output Committer committing work on object stores which implement rename() as a copy, hence slow and with a large enough failure window. HADOOP-13786 is going to make that window very small indeed, at least for job completion. One thing to look at here is the {{org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter}} protocol, where the committer can be asked whether or not it supports recovery, as well as {{isCommitJobRepeatable}} to probe for a job commit being repeatable even if it fails partway through. The committer gets to implement its policy there. > OutputCommitCoordinator should not allow another task to commit after an > ExecutorFailure > > > Key: SPARK-19790 > URL: https://issues.apache.org/jira/browse/SPARK-19790 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Imran Rashid > > The OutputCommitCoordinator resets the allowed committer when the task fails. > > https://github.com/apache/spark/blob/8aa560b75e6b083b2a890c52301414285ba35c3d/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala#L143 > However, if a task fails because of an ExecutorFailure, we actually have no > idea what the status is of the task. The task may actually still be running, > and perhaps successfully commit its output. By allowing another task to > commit its output, there is a chance that multiple tasks commit, which can > result in corrupt output. This would be particularly problematic when > commit() is an expensive operation, eg. moving files on S3. > For other task failures, we can allow other tasks to commit. But with an > ExecutorFailure, its not clear what the right thing to do is. The only safe > thing to do may be to fail the job. > This is related to SPARK-19631, and was discovered during discussion on that > PR https://github.com/apache/spark/pull/16959#discussion_r103549134 -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19790) OutputCommitCoordinator should not allow another task to commit after an ExecutorFailure
[ https://issues.apache.org/jira/browse/SPARK-19790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15890654#comment-15890654 ] Imran Rashid commented on SPARK-19790: -- cc [~kayousterhout] [~markhamstra] [~mridulm80] [~pwoody] > OutputCommitCoordinator should not allow another task to commit after an > ExecutorFailure > > > Key: SPARK-19790 > URL: https://issues.apache.org/jira/browse/SPARK-19790 > Project: Spark > Issue Type: Bug > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Imran Rashid > > The OutputCommitCoordinator resets the allowed committer when the task fails. > > https://github.com/apache/spark/blob/8aa560b75e6b083b2a890c52301414285ba35c3d/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala#L143 > However, if a task fails because of an ExecutorFailure, we actually have no > idea what the status is of the task. The task may actually still be running, > and perhaps successfully commit its output. By allowing another task to > commit its output, there is a chance that multiple tasks commit, which can > result in corrupt output. This would be particularly problematic when > commit() is an expensive operation, eg. moving files on S3. > For other task failures, we can allow other tasks to commit. But with an > ExecutorFailure, its not clear what the right thing to do is. The only safe > thing to do may be to fail the job. > This is related to SPARK-19631, and was discovered during discussion on that > PR https://github.com/apache/spark/pull/16959#discussion_r103549134 -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org