I have some code that stimulates task failure in the speculative mode. The code i compile to jar and execute with
./bin/spark-submit --class com.test.SparkTest --jars --driver-memory 2g --executor-memory 1g --master local[4] --conf spark.speculation=true --conf spark.task.maxFailures=4 SparkTest.jar In addition /conf/spark-defaults.sh contains spark.speculation = true spark.task.maxFailures = 4 However when i run my code, i don't see any speculate mode working and even worst - the failed task is not resubmitted at all and job fails only after 1 unsuccessful task. Attached both the exception and the code. Can someone explain me what i do wrong? I need help to make - Job to fail only after 4 attempts to resubmit the task and not after 1 failed task. - Speculative mode to function Here is the exception: ******************** 16/03/05 19:32:51 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:266) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:151) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:151) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:69) at org.apache.spark.scheduler.Task.run(Task.scala:81) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.Exception: new exception at com.test.SparkTest$$anonfun$1.apply(SparkTest.scala:27) at com.test.SparkTest$$anonfun$1.apply(SparkTest.scala:21) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegen$$anonfun$5$$anon$1.hasNext(WholeStageCodegen.scala:285) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:256) ... 8 more 16/03/05 19:32:51 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:266) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:151) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:151) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:69) at org.apache.spark.scheduler.Task.run(Task.scala:81) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.Exception: new exception at com.test.SparkTest$$anonfun$1.apply(SparkTest.scala:27) at com.test.SparkTest$$anonfun$1.apply(SparkTest.scala:21) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegen$$anonfun$5$$anon$1.hasNext(WholeStageCodegen.scala:285) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:256) ... 8 more 16/03/05 19:32:51 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job 16/03/05 19:32:51 ERROR InsertIntoHadoopFsRelation: Aborting job. org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): org.apache.spark.SparkException: Task failed while writing rows. Here is the code **************** val sparkConf = new SparkConf().setAppName("SparkTest") val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val failSpeculativeTask = sqlContext.udf.register("failSpeculativeTask", (i: Int, partitionId: Int, attemptNumber: Int) => { if (partitionId == 0 && i == 5) { if (attemptNumber == 0) { Thread.sleep(15000) throw new Exception("new exception") } else { Thread.sleep(10000) } } i }) val df = sc.parallelize((1 to 100), 20).mapPartitions { iter => val context = org.apache.spark.TaskContext.get() val partitionId = context.partitionId val attemptNumber = context.attemptNumber iter.map(i => (i, partitionId, attemptNumber)) }.toDF("i", "partitionId", "attemptNumber") df.select(failSpeculativeTask($"i", $"partitionId", $"attemptNumber" ).as("i"), $"partitionId", $"attemptNumber") .write.mode("overwrite").format("csv").save("/output-directory/") }