[jira] [Commented] (SPARK-19790) OutputCommitCoordinator should not allow another task to commit after an ExecutorFailure

2018-05-21 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16482512#comment-16482512
 ] 

Steve Loughran commented on SPARK-19790:


Update on this, having spent lots of time working in the internals of hadoop 
committers

The v2 commit algorithm is broken in that if it fails *or partitions* during 
commit execution, the outcome is "undefined"
The v1 committer doesn't have this problem on any FS with atomic rename; the 
task attempt dir rename() will either fail or succeed. A partitioned executor 
may still execute its rename after its successor, but as the outcome of any 
task attempt is required to be acceptable, that can be justified.
The new s3a committers handle task failure.

The only limitation then is the v2 commit algorithm, which, IMO, doesn't mean 
the semantics of a commit algorithm as required by Spark. Which means that if 
people are permitted to use that algorithm, then when it fails during a commit, 
it should be picked up and the job itself failed.

Now: how to do that?

There's currently no method in the hadoop OutputCommitter interface to probe 
this; there's only one for job commit being repeatable. We can add one to 
Hadoop 3.x+ HadoopOutputCommitter() to aid this (or have it implement 
{{StreamCapabilities}} and declare the string -> predicate map there). Spark 
can query it and fail on task execute failure if the task committer has the new 
method and declares that it can't handle commit retry. FileOutputCommitter 
would return true on the probe iff v1 was invoked; the S3A committers will 
return true always.

Thoughts? 

> OutputCommitCoordinator should not allow another task to commit after an 
> ExecutorFailure
> 
>
> Key: SPARK-19790
> URL: https://issues.apache.org/jira/browse/SPARK-19790
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.1.0
>Reporter: Imran Rashid
>Priority: Major
>
> The OutputCommitCoordinator resets the allowed committer when the task fails. 
>  
> https://github.com/apache/spark/blob/8aa560b75e6b083b2a890c52301414285ba35c3d/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala#L143
> However, if a task fails because of an ExecutorFailure, we actually have no 
> idea what the status is of the task.  The task may actually still be running, 
> and perhaps successfully commit its output.  By allowing another task to 
> commit its output, there is a chance that multiple tasks commit, which can 
> result in corrupt output.  This would be particularly problematic when 
> commit() is an expensive operation, eg. moving files on S3.
> For other task failures, we can allow other tasks to commit.  But with an 
> ExecutorFailure, its not clear what the right thing to do is.  The only safe 
> thing to do may be to fail the job.
> This is related to SPARK-19631, and was discovered during discussion on that 
> PR https://github.com/apache/spark/pull/16959#discussion_r103549134



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19790) OutputCommitCoordinator should not allow another task to commit after an ExecutorFailure

2017-07-15 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16088630#comment-16088630
 ] 

Steve Loughran commented on SPARK-19790:


I've now summarised the FileOutputCommitter v1 and v2 algorithms as far as I 
can understand from the source and some step throughts; I think I'd need to add 
a few more tests to be really sure I understand it: 
[https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3a_committer_architecture.md]

h2. v1

# task commits by atomic rename() of 
{{$dest/_temporary/$appAttemptId/_temporary/$taskAttemptID}}  to the completed 
dir {{$dest/__temporary/$jobAttempt/$taskAttempt}}  ; job commit moves all data 
under each task attempt into $dest. Note that as only one rename() to that dest 
will work, race conditions in speculative work are prevented without the need 
for expliicit co-ord

# failure during task commit: delete the completed task directory (if any), 
rerun task.
# job commit: list completed tasks, move/merge them into the dest dir. Not 
atomic, rename operation count is per task & dir tree,. A failure in job commit 
is not recoverable.
# failure during job commit: job in unknown state, rm $dest and rerun

# job restart: after failure of entire job, it can be restarted, with restarted 
job using the completed tasks. Provided rename() is atomic there's a guarantee 
that every  task's completed dir is valid; provided it's O(1) its an 
inexpensvie operation

h3. v2

# task commit: copy straight to dest (this is where co-ordination is needed 
between tasks and job manager) 
# job commit: no-op
# job restart: none, start again
# failure during task commit: dest dir in unknown state, job restart needed
# failure during job commit: job in unknown state, rm $dest and rerun

Given that spark doesn't do job restart, switching to v2 everywhere reduces #of 
renames, but makes recovery from failure during task commit impossible

Neither algorithm is correct on an inconsistent S3 endpoint, as they both can 
get incorrect listings of files to COPY + DELETE. With consistency, you still 
have O(data) task commits on both algorithms, and another O(data) job commit 
with v1. Azure WASB is consistent, and uses leases for exclusive access to bits 
of the store on comnmit, but even it can do with a store-specific committer. 
Rename is not the solution to committing data in an object store.



> OutputCommitCoordinator should not allow another task to commit after an 
> ExecutorFailure
> 
>
> Key: SPARK-19790
> URL: https://issues.apache.org/jira/browse/SPARK-19790
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.1.0
>Reporter: Imran Rashid
>
> The OutputCommitCoordinator resets the allowed committer when the task fails. 
>  
> https://github.com/apache/spark/blob/8aa560b75e6b083b2a890c52301414285ba35c3d/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala#L143
> However, if a task fails because of an ExecutorFailure, we actually have no 
> idea what the status is of the task.  The task may actually still be running, 
> and perhaps successfully commit its output.  By allowing another task to 
> commit its output, there is a chance that multiple tasks commit, which can 
> result in corrupt output.  This would be particularly problematic when 
> commit() is an expensive operation, eg. moving files on S3.
> For other task failures, we can allow other tasks to commit.  But with an 
> ExecutorFailure, its not clear what the right thing to do is.  The only safe 
> thing to do may be to fail the job.
> This is related to SPARK-19631, and was discovered during discussion on that 
> PR https://github.com/apache/spark/pull/16959#discussion_r103549134



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org




[jira] [Commented] (SPARK-19790) OutputCommitCoordinator should not allow another task to commit after an ExecutorFailure

2017-03-06 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898003#comment-15898003
 ] 

Steve Loughran commented on SPARK-19790:


Thinking some more & looking at code snippets

# FileOutputFormat with algorithm 2 can recover from a failed task commit, 
somehow. That code is too complex to make sense of now that it mixes two 
algorithms with corecursion and stuff to run in client and server.
# The s3guard committer will have the task upload it's files in parallel, but 
will not complete the multipart commit; the information to do this will be 
persisted to HDFS for execution by the job committer.
# I plan a little spark extension which will do the same for files with 
absolute destinations, this time passing the data back to the job committer.

Which means a failed task can be recovered from. All pending writes for that 
task will need to be found (scan FS) and abort. Still an issue about ordering 
of PUT vs save of upload data; some GC of pending commits to a dest dir would 
be the way to avoid running up bills. 

This is one of those coordination problems where someone with TLA+ algorithm 
specification skills would be good, along with the foundation specs for 
filesystems and object stores. Someone needs to find a CS student looking for a 
project.

> OutputCommitCoordinator should not allow another task to commit after an 
> ExecutorFailure
> 
>
> Key: SPARK-19790
> URL: https://issues.apache.org/jira/browse/SPARK-19790
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.1.0
>Reporter: Imran Rashid
>
> The OutputCommitCoordinator resets the allowed committer when the task fails. 
>  
> https://github.com/apache/spark/blob/8aa560b75e6b083b2a890c52301414285ba35c3d/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala#L143
> However, if a task fails because of an ExecutorFailure, we actually have no 
> idea what the status is of the task.  The task may actually still be running, 
> and perhaps successfully commit its output.  By allowing another task to 
> commit its output, there is a chance that multiple tasks commit, which can 
> result in corrupt output.  This would be particularly problematic when 
> commit() is an expensive operation, eg. moving files on S3.
> For other task failures, we can allow other tasks to commit.  But with an 
> ExecutorFailure, its not clear what the right thing to do is.  The only safe 
> thing to do may be to fail the job.
> This is related to SPARK-19631, and was discovered during discussion on that 
> PR https://github.com/apache/spark/pull/16959#discussion_r103549134



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19790) OutputCommitCoordinator should not allow another task to commit after an ExecutorFailure

2017-03-06 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897863#comment-15897863
 ] 

Steve Loughran commented on SPARK-19790:


The only time a task output committer should be making observable state changes 
is during the actual commit operation. If it is doing things before that commit 
operation, that's a bug in that it doesn't meet the goal "committer".

The Hadoop output committer has two stages here: the FileOutputFormat work and 
then rename of files; together they are not a transaction, but on a real 
filesystem: fast

The now deleted DirectOutputCommitter was doing things as it went along —but 
that's why it got pulled.

That leaves: the Hadoop Output Committer committing work on object stores which 
implement rename() as a copy, hence slow and with a large enough failure 
window. HADOOP-13786 is going to make that window very small indeed, at least 
for job completion.

One thing to look at here is the 
{{org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter}} protocol, where 
the committer can be asked whether or not it supports recovery, as well as 
{{isCommitJobRepeatable}} to probe for a job commit being repeatable even if it 
fails partway through. The committer gets to implement its policy there.


> OutputCommitCoordinator should not allow another task to commit after an 
> ExecutorFailure
> 
>
> Key: SPARK-19790
> URL: https://issues.apache.org/jira/browse/SPARK-19790
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.1.0
>Reporter: Imran Rashid
>
> The OutputCommitCoordinator resets the allowed committer when the task fails. 
>  
> https://github.com/apache/spark/blob/8aa560b75e6b083b2a890c52301414285ba35c3d/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala#L143
> However, if a task fails because of an ExecutorFailure, we actually have no 
> idea what the status is of the task.  The task may actually still be running, 
> and perhaps successfully commit its output.  By allowing another task to 
> commit its output, there is a chance that multiple tasks commit, which can 
> result in corrupt output.  This would be particularly problematic when 
> commit() is an expensive operation, eg. moving files on S3.
> For other task failures, we can allow other tasks to commit.  But with an 
> ExecutorFailure, its not clear what the right thing to do is.  The only safe 
> thing to do may be to fail the job.
> This is related to SPARK-19631, and was discovered during discussion on that 
> PR https://github.com/apache/spark/pull/16959#discussion_r103549134



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19790) OutputCommitCoordinator should not allow another task to commit after an ExecutorFailure

2017-03-01 Thread Imran Rashid (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15890654#comment-15890654
 ] 

Imran Rashid commented on SPARK-19790:
--

cc [~kayousterhout] [~markhamstra] [~mridulm80] [~pwoody]

> OutputCommitCoordinator should not allow another task to commit after an 
> ExecutorFailure
> 
>
> Key: SPARK-19790
> URL: https://issues.apache.org/jira/browse/SPARK-19790
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 2.1.0
>Reporter: Imran Rashid
>
> The OutputCommitCoordinator resets the allowed committer when the task fails. 
>  
> https://github.com/apache/spark/blob/8aa560b75e6b083b2a890c52301414285ba35c3d/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala#L143
> However, if a task fails because of an ExecutorFailure, we actually have no 
> idea what the status is of the task.  The task may actually still be running, 
> and perhaps successfully commit its output.  By allowing another task to 
> commit its output, there is a chance that multiple tasks commit, which can 
> result in corrupt output.  This would be particularly problematic when 
> commit() is an expensive operation, eg. moving files on S3.
> For other task failures, we can allow other tasks to commit.  But with an 
> ExecutorFailure, its not clear what the right thing to do is.  The only safe 
> thing to do may be to fail the job.
> This is related to SPARK-19631, and was discovered during discussion on that 
> PR https://github.com/apache/spark/pull/16959#discussion_r103549134



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org