[jira] [Updated] (SPARK-4879) Missing output partitions after job completes with speculative execution

2015-09-12 Thread Sean Owen (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen updated SPARK-4879:
-
Target Version/s: 1.3.0  (was: 1.0.3, 1.1.2, 1.2.2, 1.3.0)
  Labels:   (was: backport-needed)

I'm clearing "backport-needed" since it's virtually certain that there will be 
no more 1.2.x or earlier releases, and so the fix that was committed won't go 
back further at this point.

Is it something to leave open pending the ongoing conversation here? sounds 
like there may be more to the fix? 

> Missing output partitions after job completes with speculative execution
> 
>
> Key: SPARK-4879
> URL: https://issues.apache.org/jira/browse/SPARK-4879
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output, Spark Core
>Affects Versions: 1.0.2, 1.1.1, 1.2.0, 1.3.0
>Reporter: Josh Rosen
>Assignee: Josh Rosen
>Priority: Critical
> Fix For: 1.3.0
>
> Attachments: speculation.txt, speculation2.txt
>
>
> When speculative execution is enabled ({{spark.speculation=true}}), jobs that 
> save output files may report that they have completed successfully even 
> though some output partitions written by speculative tasks may be missing.
> h3. Reproduction
> This symptom was reported to me by a Spark user and I've been doing my own 
> investigation to try to come up with an in-house reproduction.
> I'm still working on a reliable local reproduction for this issue, which is a 
> little tricky because Spark won't schedule speculated tasks on the same host 
> as the original task, so you need an actual (or containerized) multi-host 
> cluster to test speculation.  Here's a simple reproduction of some of the 
> symptoms on EC2, which can be run in {{spark-shell}} with {{--conf 
> spark.speculation=true}}:
> {code}
> // Rig a job such that all but one of the tasks complete instantly
> // and one task runs for 20 seconds on its first attempt and instantly
> // on its second attempt:
> val numTasks = 100
> sc.parallelize(1 to numTasks, 
> numTasks).repartition(2).mapPartitionsWithContext { case (ctx, iter) =>
>   if (ctx.partitionId == 0) {  // If this is the one task that should run 
> really slow
> if (ctx.attemptId == 0) {  // If this is the first attempt, run slow
>  Thread.sleep(20 * 1000)
> }
>   }
>   iter
> }.map(x => (x, x)).saveAsTextFile("/test4")
> {code}
> When I run this, I end up with a job that completes quickly (due to 
> speculation) but reports failures from the speculated task:
> {code}
> [...]
> 14/12/11 01:41:13 INFO scheduler.TaskSetManager: Finished task 37.1 in stage 
> 3.0 (TID 411) in 131 ms on ip-172-31-8-164.us-west-2.compute.internal 
> (100/100)
> 14/12/11 01:41:13 INFO scheduler.DAGScheduler: Stage 3 (saveAsTextFile at 
> :22) finished in 0.856 s
> 14/12/11 01:41:13 INFO spark.SparkContext: Job finished: saveAsTextFile at 
> :22, took 0.885438374 s
> 14/12/11 01:41:13 INFO scheduler.TaskSetManager: Ignoring task-finished event 
> for 70.1 in stage 3.0 because task 70 has already completed successfully
> scala> 14/12/11 01:41:13 WARN scheduler.TaskSetManager: Lost task 49.1 in 
> stage 3.0 (TID 413, ip-172-31-8-164.us-west-2.compute.internal): 
> java.io.IOException: Failed to save output of task: 
> attempt_201412110141_0003_m_49_413
> 
> org.apache.hadoop.mapred.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:160)
> 
> org.apache.hadoop.mapred.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:172)
> 
> org.apache.hadoop.mapred.FileOutputCommitter.commitTask(FileOutputCommitter.java:132)
> org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:109)
> 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:991)
> 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
> org.apache.spark.scheduler.Task.run(Task.scala:54)
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
> 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> java.lang.Thread.run(Thread.java:745)
> {code}
> One interesting thing to note about this stack trace: if we look at 
> {{FileOutputCommitter.java:160}} 
> ([link|http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/org.apache.hadoop/hadoop-core/2.5.0-mr1-cdh5.2.0/org/apache/hadoop/mapred/FileOutputCommitter.java#160]),
>  this point in the execution seems to correspond to a case where a task 
> completes, 

[jira] [Updated] (SPARK-4879) Missing output partitions after job completes with speculative execution

2015-03-05 Thread Mridul Muralidharan (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan updated SPARK-4879:
---
Affects Version/s: 1.3.0

 Missing output partitions after job completes with speculative execution
 

 Key: SPARK-4879
 URL: https://issues.apache.org/jira/browse/SPARK-4879
 Project: Spark
  Issue Type: Bug
  Components: Input/Output, Spark Core
Affects Versions: 1.0.2, 1.1.1, 1.2.0, 1.3.0
Reporter: Josh Rosen
Assignee: Josh Rosen
Priority: Critical
  Labels: backport-needed
 Fix For: 1.3.0

 Attachments: speculation.txt, speculation2.txt


 When speculative execution is enabled ({{spark.speculation=true}}), jobs that 
 save output files may report that they have completed successfully even 
 though some output partitions written by speculative tasks may be missing.
 h3. Reproduction
 This symptom was reported to me by a Spark user and I've been doing my own 
 investigation to try to come up with an in-house reproduction.
 I'm still working on a reliable local reproduction for this issue, which is a 
 little tricky because Spark won't schedule speculated tasks on the same host 
 as the original task, so you need an actual (or containerized) multi-host 
 cluster to test speculation.  Here's a simple reproduction of some of the 
 symptoms on EC2, which can be run in {{spark-shell}} with {{--conf 
 spark.speculation=true}}:
 {code}
 // Rig a job such that all but one of the tasks complete instantly
 // and one task runs for 20 seconds on its first attempt and instantly
 // on its second attempt:
 val numTasks = 100
 sc.parallelize(1 to numTasks, 
 numTasks).repartition(2).mapPartitionsWithContext { case (ctx, iter) =
   if (ctx.partitionId == 0) {  // If this is the one task that should run 
 really slow
 if (ctx.attemptId == 0) {  // If this is the first attempt, run slow
  Thread.sleep(20 * 1000)
 }
   }
   iter
 }.map(x = (x, x)).saveAsTextFile(/test4)
 {code}
 When I run this, I end up with a job that completes quickly (due to 
 speculation) but reports failures from the speculated task:
 {code}
 [...]
 14/12/11 01:41:13 INFO scheduler.TaskSetManager: Finished task 37.1 in stage 
 3.0 (TID 411) in 131 ms on ip-172-31-8-164.us-west-2.compute.internal 
 (100/100)
 14/12/11 01:41:13 INFO scheduler.DAGScheduler: Stage 3 (saveAsTextFile at 
 console:22) finished in 0.856 s
 14/12/11 01:41:13 INFO spark.SparkContext: Job finished: saveAsTextFile at 
 console:22, took 0.885438374 s
 14/12/11 01:41:13 INFO scheduler.TaskSetManager: Ignoring task-finished event 
 for 70.1 in stage 3.0 because task 70 has already completed successfully
 scala 14/12/11 01:41:13 WARN scheduler.TaskSetManager: Lost task 49.1 in 
 stage 3.0 (TID 413, ip-172-31-8-164.us-west-2.compute.internal): 
 java.io.IOException: Failed to save output of task: 
 attempt_201412110141_0003_m_49_413
 
 org.apache.hadoop.mapred.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:160)
 
 org.apache.hadoop.mapred.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:172)
 
 org.apache.hadoop.mapred.FileOutputCommitter.commitTask(FileOutputCommitter.java:132)
 org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:109)
 
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:991)
 
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
 org.apache.spark.scheduler.Task.run(Task.scala:54)
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 java.lang.Thread.run(Thread.java:745)
 {code}
 One interesting thing to note about this stack trace: if we look at 
 {{FileOutputCommitter.java:160}} 
 ([link|http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/org.apache.hadoop/hadoop-core/2.5.0-mr1-cdh5.2.0/org/apache/hadoop/mapred/FileOutputCommitter.java#160]),
  this point in the execution seems to correspond to a case where a task 
 completes, attempts to commit its output, fails for some reason, then deletes 
 the destination file, tries again, and fails:
 {code}
  if (fs.isFile(taskOutput)) {
 152  Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, 
 153  getTempTaskOutputPath(context));
 154  if (!fs.rename(taskOutput, finalOutputPath)) {
 155if (!fs.delete(finalOutputPath, 

[jira] [Updated] (SPARK-4879) Missing output partitions after job completes with speculative execution

2015-02-10 Thread Andrew Or (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Or updated SPARK-4879:
-
Fix Version/s: 1.3.0

 Missing output partitions after job completes with speculative execution
 

 Key: SPARK-4879
 URL: https://issues.apache.org/jira/browse/SPARK-4879
 Project: Spark
  Issue Type: Bug
  Components: Input/Output, Spark Core
Affects Versions: 1.0.2, 1.1.1, 1.2.0
Reporter: Josh Rosen
Assignee: Josh Rosen
Priority: Critical
 Fix For: 1.3.0

 Attachments: speculation.txt, speculation2.txt


 When speculative execution is enabled ({{spark.speculation=true}}), jobs that 
 save output files may report that they have completed successfully even 
 though some output partitions written by speculative tasks may be missing.
 h3. Reproduction
 This symptom was reported to me by a Spark user and I've been doing my own 
 investigation to try to come up with an in-house reproduction.
 I'm still working on a reliable local reproduction for this issue, which is a 
 little tricky because Spark won't schedule speculated tasks on the same host 
 as the original task, so you need an actual (or containerized) multi-host 
 cluster to test speculation.  Here's a simple reproduction of some of the 
 symptoms on EC2, which can be run in {{spark-shell}} with {{--conf 
 spark.speculation=true}}:
 {code}
 // Rig a job such that all but one of the tasks complete instantly
 // and one task runs for 20 seconds on its first attempt and instantly
 // on its second attempt:
 val numTasks = 100
 sc.parallelize(1 to numTasks, 
 numTasks).repartition(2).mapPartitionsWithContext { case (ctx, iter) =
   if (ctx.partitionId == 0) {  // If this is the one task that should run 
 really slow
 if (ctx.attemptId == 0) {  // If this is the first attempt, run slow
  Thread.sleep(20 * 1000)
 }
   }
   iter
 }.map(x = (x, x)).saveAsTextFile(/test4)
 {code}
 When I run this, I end up with a job that completes quickly (due to 
 speculation) but reports failures from the speculated task:
 {code}
 [...]
 14/12/11 01:41:13 INFO scheduler.TaskSetManager: Finished task 37.1 in stage 
 3.0 (TID 411) in 131 ms on ip-172-31-8-164.us-west-2.compute.internal 
 (100/100)
 14/12/11 01:41:13 INFO scheduler.DAGScheduler: Stage 3 (saveAsTextFile at 
 console:22) finished in 0.856 s
 14/12/11 01:41:13 INFO spark.SparkContext: Job finished: saveAsTextFile at 
 console:22, took 0.885438374 s
 14/12/11 01:41:13 INFO scheduler.TaskSetManager: Ignoring task-finished event 
 for 70.1 in stage 3.0 because task 70 has already completed successfully
 scala 14/12/11 01:41:13 WARN scheduler.TaskSetManager: Lost task 49.1 in 
 stage 3.0 (TID 413, ip-172-31-8-164.us-west-2.compute.internal): 
 java.io.IOException: Failed to save output of task: 
 attempt_201412110141_0003_m_49_413
 
 org.apache.hadoop.mapred.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:160)
 
 org.apache.hadoop.mapred.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:172)
 
 org.apache.hadoop.mapred.FileOutputCommitter.commitTask(FileOutputCommitter.java:132)
 org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:109)
 
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:991)
 
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
 org.apache.spark.scheduler.Task.run(Task.scala:54)
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 java.lang.Thread.run(Thread.java:745)
 {code}
 One interesting thing to note about this stack trace: if we look at 
 {{FileOutputCommitter.java:160}} 
 ([link|http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/org.apache.hadoop/hadoop-core/2.5.0-mr1-cdh5.2.0/org/apache/hadoop/mapred/FileOutputCommitter.java#160]),
  this point in the execution seems to correspond to a case where a task 
 completes, attempts to commit its output, fails for some reason, then deletes 
 the destination file, tries again, and fails:
 {code}
  if (fs.isFile(taskOutput)) {
 152  Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, 
 153  getTempTaskOutputPath(context));
 154  if (!fs.rename(taskOutput, finalOutputPath)) {
 155if (!fs.delete(finalOutputPath, true)) {
 156  throw new IOException(Failed to delete 

[jira] [Updated] (SPARK-4879) Missing output partitions after job completes with speculative execution

2015-02-10 Thread Andrew Or (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Or updated SPARK-4879:
-
Labels: backport-needed  (was: )

 Missing output partitions after job completes with speculative execution
 

 Key: SPARK-4879
 URL: https://issues.apache.org/jira/browse/SPARK-4879
 Project: Spark
  Issue Type: Bug
  Components: Input/Output, Spark Core
Affects Versions: 1.0.2, 1.1.1, 1.2.0
Reporter: Josh Rosen
Assignee: Josh Rosen
Priority: Critical
  Labels: backport-needed
 Fix For: 1.3.0

 Attachments: speculation.txt, speculation2.txt


 When speculative execution is enabled ({{spark.speculation=true}}), jobs that 
 save output files may report that they have completed successfully even 
 though some output partitions written by speculative tasks may be missing.
 h3. Reproduction
 This symptom was reported to me by a Spark user and I've been doing my own 
 investigation to try to come up with an in-house reproduction.
 I'm still working on a reliable local reproduction for this issue, which is a 
 little tricky because Spark won't schedule speculated tasks on the same host 
 as the original task, so you need an actual (or containerized) multi-host 
 cluster to test speculation.  Here's a simple reproduction of some of the 
 symptoms on EC2, which can be run in {{spark-shell}} with {{--conf 
 spark.speculation=true}}:
 {code}
 // Rig a job such that all but one of the tasks complete instantly
 // and one task runs for 20 seconds on its first attempt and instantly
 // on its second attempt:
 val numTasks = 100
 sc.parallelize(1 to numTasks, 
 numTasks).repartition(2).mapPartitionsWithContext { case (ctx, iter) =
   if (ctx.partitionId == 0) {  // If this is the one task that should run 
 really slow
 if (ctx.attemptId == 0) {  // If this is the first attempt, run slow
  Thread.sleep(20 * 1000)
 }
   }
   iter
 }.map(x = (x, x)).saveAsTextFile(/test4)
 {code}
 When I run this, I end up with a job that completes quickly (due to 
 speculation) but reports failures from the speculated task:
 {code}
 [...]
 14/12/11 01:41:13 INFO scheduler.TaskSetManager: Finished task 37.1 in stage 
 3.0 (TID 411) in 131 ms on ip-172-31-8-164.us-west-2.compute.internal 
 (100/100)
 14/12/11 01:41:13 INFO scheduler.DAGScheduler: Stage 3 (saveAsTextFile at 
 console:22) finished in 0.856 s
 14/12/11 01:41:13 INFO spark.SparkContext: Job finished: saveAsTextFile at 
 console:22, took 0.885438374 s
 14/12/11 01:41:13 INFO scheduler.TaskSetManager: Ignoring task-finished event 
 for 70.1 in stage 3.0 because task 70 has already completed successfully
 scala 14/12/11 01:41:13 WARN scheduler.TaskSetManager: Lost task 49.1 in 
 stage 3.0 (TID 413, ip-172-31-8-164.us-west-2.compute.internal): 
 java.io.IOException: Failed to save output of task: 
 attempt_201412110141_0003_m_49_413
 
 org.apache.hadoop.mapred.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:160)
 
 org.apache.hadoop.mapred.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:172)
 
 org.apache.hadoop.mapred.FileOutputCommitter.commitTask(FileOutputCommitter.java:132)
 org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:109)
 
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:991)
 
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
 org.apache.spark.scheduler.Task.run(Task.scala:54)
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 java.lang.Thread.run(Thread.java:745)
 {code}
 One interesting thing to note about this stack trace: if we look at 
 {{FileOutputCommitter.java:160}} 
 ([link|http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/org.apache.hadoop/hadoop-core/2.5.0-mr1-cdh5.2.0/org/apache/hadoop/mapred/FileOutputCommitter.java#160]),
  this point in the execution seems to correspond to a case where a task 
 completes, attempts to commit its output, fails for some reason, then deletes 
 the destination file, tries again, and fails:
 {code}
  if (fs.isFile(taskOutput)) {
 152  Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, 
 153  getTempTaskOutputPath(context));
 154  if (!fs.rename(taskOutput, finalOutputPath)) {
 155if (!fs.delete(finalOutputPath, true)) {
 156  

[jira] [Updated] (SPARK-4879) Missing output partitions after job completes with speculative execution

2015-02-10 Thread Andrew Or (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Or updated SPARK-4879:
-
Target Version/s: 1.0.3, 1.3.0, 1.1.2, 1.2.2  (was: 1.0.3, 1.3.0, 1.1.2, 
1.2.1)

 Missing output partitions after job completes with speculative execution
 

 Key: SPARK-4879
 URL: https://issues.apache.org/jira/browse/SPARK-4879
 Project: Spark
  Issue Type: Bug
  Components: Input/Output, Spark Core
Affects Versions: 1.0.2, 1.1.1, 1.2.0
Reporter: Josh Rosen
Assignee: Josh Rosen
Priority: Critical
 Attachments: speculation.txt, speculation2.txt


 When speculative execution is enabled ({{spark.speculation=true}}), jobs that 
 save output files may report that they have completed successfully even 
 though some output partitions written by speculative tasks may be missing.
 h3. Reproduction
 This symptom was reported to me by a Spark user and I've been doing my own 
 investigation to try to come up with an in-house reproduction.
 I'm still working on a reliable local reproduction for this issue, which is a 
 little tricky because Spark won't schedule speculated tasks on the same host 
 as the original task, so you need an actual (or containerized) multi-host 
 cluster to test speculation.  Here's a simple reproduction of some of the 
 symptoms on EC2, which can be run in {{spark-shell}} with {{--conf 
 spark.speculation=true}}:
 {code}
 // Rig a job such that all but one of the tasks complete instantly
 // and one task runs for 20 seconds on its first attempt and instantly
 // on its second attempt:
 val numTasks = 100
 sc.parallelize(1 to numTasks, 
 numTasks).repartition(2).mapPartitionsWithContext { case (ctx, iter) =
   if (ctx.partitionId == 0) {  // If this is the one task that should run 
 really slow
 if (ctx.attemptId == 0) {  // If this is the first attempt, run slow
  Thread.sleep(20 * 1000)
 }
   }
   iter
 }.map(x = (x, x)).saveAsTextFile(/test4)
 {code}
 When I run this, I end up with a job that completes quickly (due to 
 speculation) but reports failures from the speculated task:
 {code}
 [...]
 14/12/11 01:41:13 INFO scheduler.TaskSetManager: Finished task 37.1 in stage 
 3.0 (TID 411) in 131 ms on ip-172-31-8-164.us-west-2.compute.internal 
 (100/100)
 14/12/11 01:41:13 INFO scheduler.DAGScheduler: Stage 3 (saveAsTextFile at 
 console:22) finished in 0.856 s
 14/12/11 01:41:13 INFO spark.SparkContext: Job finished: saveAsTextFile at 
 console:22, took 0.885438374 s
 14/12/11 01:41:13 INFO scheduler.TaskSetManager: Ignoring task-finished event 
 for 70.1 in stage 3.0 because task 70 has already completed successfully
 scala 14/12/11 01:41:13 WARN scheduler.TaskSetManager: Lost task 49.1 in 
 stage 3.0 (TID 413, ip-172-31-8-164.us-west-2.compute.internal): 
 java.io.IOException: Failed to save output of task: 
 attempt_201412110141_0003_m_49_413
 
 org.apache.hadoop.mapred.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:160)
 
 org.apache.hadoop.mapred.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:172)
 
 org.apache.hadoop.mapred.FileOutputCommitter.commitTask(FileOutputCommitter.java:132)
 org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:109)
 
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:991)
 
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
 org.apache.spark.scheduler.Task.run(Task.scala:54)
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 java.lang.Thread.run(Thread.java:745)
 {code}
 One interesting thing to note about this stack trace: if we look at 
 {{FileOutputCommitter.java:160}} 
 ([link|http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/org.apache.hadoop/hadoop-core/2.5.0-mr1-cdh5.2.0/org/apache/hadoop/mapred/FileOutputCommitter.java#160]),
  this point in the execution seems to correspond to a case where a task 
 completes, attempts to commit its output, fails for some reason, then deletes 
 the destination file, tries again, and fails:
 {code}
  if (fs.isFile(taskOutput)) {
 152  Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, 
 153  getTempTaskOutputPath(context));
 154  if (!fs.rename(taskOutput, finalOutputPath)) {
 155if (!fs.delete(finalOutputPath, true)) {
 156  throw new 

[jira] [Updated] (SPARK-4879) Missing output partitions after job completes with speculative execution

2015-01-07 Thread Zach Fry (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zach Fry updated SPARK-4879:

Attachment: speculation2.txt
speculation.txt

Hey Josh, 

I have been playing around with your repro above and I think I can consistently 
trigger the bad behavior by just tweaking the value of 
{{spark.speculation.multiplier}} and {{spark.speculation.quantile}}.

I set the {{multiplier}} to be 1 and the {{quantile}} to 0.01 so that only 1% 
of tasks have to finish before any task that takes longer than those 1% of 
tasks should speculate. 
As expected, I see a lot of tasks getting speculated. 
After running the repro about 5 times, I have seen 2 errors (stack traces at 
the bottom and the full run from the REPL is attached with this comment). 

One thing I do notice is that the part-0 associated with Stage 1 was always 
where I expected it to be in HDFS, and all lines were present (checked using a 
{{wc -l}})


{code}
scala 15/01/07 13:44:26 WARN scheduler.TaskSetManager: Lost task 0.1 in stage 
0.0 (TID 119, redacted-host-02): java.io.IOException: The temporary 
job-output directory hdfs://redacted-host-01:8020/test6/_temporary doesn't 
exist!

org.apache.hadoop.mapred.FileOutputCommitter.getWorkPath(FileOutputCommitter.java:250)

org.apache.hadoop.mapred.FileOutputFormat.getTaskOutputPath(FileOutputFormat.java:240)

org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:116)
org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:89)

org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:980)

org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
{code}

{code}
15/01/07 15:17:39 WARN scheduler.TaskSetManager: Lost task 0.1 in stage 0.0 
(TID 120, redacted-host-03): org.apache.hadoop.ipc.RemoteException: No lease 
on /test7/_temporary/_attempt_201501071517__m_00_120/part-0: File 
does not exist. Holder DFSClient_NONMAPREDUCE_-469253416_73 does not have any 
open files.
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2609)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:2426)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2339)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:501)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:299)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44954)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1002)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1752)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1748)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1746)

org.apache.hadoop.ipc.Client.call(Client.java:1238)

org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
com.sun.proxy.$Proxy9.addBlock(Unknown Source)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)

org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)

org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
com.sun.proxy.$Proxy9.addBlock(Unknown Source)

org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:291)

org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1177)


[jira] [Updated] (SPARK-4879) Missing output partitions after job completes with speculative execution

2014-12-29 Thread Josh Rosen (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josh Rosen updated SPARK-4879:
--
Description: 
When speculative execution is enabled ({{spark.speculation=true}}), jobs that 
save output files may report that they have completed successfully even though 
some output partitions written by speculative tasks may be missing.

h3. Reproduction

This symptom was reported to me by a Spark user and I've been doing my own 
investigation to try to come up with an in-house reproduction.

I'm still working on a reliable local reproduction for this issue, which is a 
little tricky because Spark won't schedule speculated tasks on the same host as 
the original task, so you need an actual (or containerized) multi-host cluster 
to test speculation.  Here's a simple reproduction of some of the symptoms on 
EC2, which can be run in {{spark-shell}} with {{--conf spark.speculation=true}}:

{code}
// Rig a job such that all but one of the tasks complete instantly
// and one task runs for 20 seconds on its first attempt and instantly
// on its second attempt:
val numTasks = 100
sc.parallelize(1 to numTasks, 
numTasks).repartition(2).mapPartitionsWithContext { case (ctx, iter) =
  if (ctx.partitionId == 0) {  // If this is the one task that should run 
really slow
if (ctx.attemptId == 0) {  // If this is the first attempt, run slow
 Thread.sleep(20 * 1000)
}
  }
  iter
}.map(x = (x, x)).saveAsTextFile(/test4)
{code}

When I run this, I end up with a job that completes quickly (due to 
speculation) but reports failures from the speculated task:

{code}
[...]
14/12/11 01:41:13 INFO scheduler.TaskSetManager: Finished task 37.1 in stage 
3.0 (TID 411) in 131 ms on ip-172-31-8-164.us-west-2.compute.internal (100/100)
14/12/11 01:41:13 INFO scheduler.DAGScheduler: Stage 3 (saveAsTextFile at 
console:22) finished in 0.856 s
14/12/11 01:41:13 INFO spark.SparkContext: Job finished: saveAsTextFile at 
console:22, took 0.885438374 s
14/12/11 01:41:13 INFO scheduler.TaskSetManager: Ignoring task-finished event 
for 70.1 in stage 3.0 because task 70 has already completed successfully

scala 14/12/11 01:41:13 WARN scheduler.TaskSetManager: Lost task 49.1 in stage 
3.0 (TID 413, ip-172-31-8-164.us-west-2.compute.internal): java.io.IOException: 
Failed to save output of task: attempt_201412110141_0003_m_49_413

org.apache.hadoop.mapred.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:160)

org.apache.hadoop.mapred.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:172)

org.apache.hadoop.mapred.FileOutputCommitter.commitTask(FileOutputCommitter.java:132)
org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:109)

org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:991)

org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
{code}

One interesting thing to note about this stack trace: if we look at 
{{FileOutputCommitter.java:160}} 
([link|http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/org.apache.hadoop/hadoop-core/2.5.0-mr1-cdh5.2.0/org/apache/hadoop/mapred/FileOutputCommitter.java#160]),
 this point in the execution seems to correspond to a case where a task 
completes, attempts to commit its output, fails for some reason, then deletes 
the destination file, tries again, and fails:

{code}
 if (fs.isFile(taskOutput)) {
152  Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, 
153  getTempTaskOutputPath(context));
154  if (!fs.rename(taskOutput, finalOutputPath)) {
155if (!fs.delete(finalOutputPath, true)) {
156  throw new IOException(Failed to delete earlier output of task:  
+ 
157 attemptId);
158}
159if (!fs.rename(taskOutput, finalOutputPath)) {
160  throw new IOException(Failed to save output of task:  + 
161   attemptId);
162}
163  }
{code}

This could explain why the output file is missing: the second copy of the task 
keeps running after the job completes and deletes the output written by the 
other task after failing to commit its own copy of the output.

There are still a few open questions about how exactly we get into this 
scenario:

*Why is the second copy of the task allowed to commit its output after