I have some code that stimulates task failure in the speculative mode.
The code i compile to jar and execute with 

./bin/spark-submit --class com.test.SparkTest --jars --driver-memory 2g 
--executor-memory 1g --master local[4] --conf spark.speculation=true 
--conf spark.task.maxFailures=4 SparkTest.jar 

In addition /conf/spark-defaults.sh contains

spark.speculation = true
spark.task.maxFailures = 4

However when i run my code, i don't see any speculate mode working and 
even worst - the failed task is not resubmitted at all and job fails only 
after 1 unsuccessful task.
Attached both the exception and the code. Can someone explain me what i do 
wrong? 

I need help to make
- Job to fail only after 4 attempts to resubmit the task and not after 1 
failed task.
- Speculative mode to function

Here is the exception:
********************
16/03/05 19:32:51 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 
0)
org.apache.spark.SparkException: Task failed while writing rows.
        at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:266)
        at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:151)
        at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:151)
        at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:69)
        at org.apache.spark.scheduler.Task.run(Task.scala:81)
        at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: new exception
        at com.test.SparkTest$$anonfun$1.apply(SparkTest.scala:27)
        at com.test.SparkTest$$anonfun$1.apply(SparkTest.scala:21)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 
Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegen$$anonfun$5$$anon$1.hasNext(WholeStageCodegen.scala:285)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
        at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:256)
        ... 8 more
16/03/05 19:32:51 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 
localhost): org.apache.spark.SparkException: Task failed while writing 
rows.
        at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:266)
        at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:151)
        at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:151)
        at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:69)
        at org.apache.spark.scheduler.Task.run(Task.scala:81)
        at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: new exception
        at com.test.SparkTest$$anonfun$1.apply(SparkTest.scala:27)
        at com.test.SparkTest$$anonfun$1.apply(SparkTest.scala:21)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 
Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegen$$anonfun$5$$anon$1.hasNext(WholeStageCodegen.scala:285)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
        at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:256)
        ... 8 more

16/03/05 19:32:51 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 
times; aborting job
16/03/05 19:32:51 ERROR InsertIntoHadoopFsRelation: Aborting job.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 
0.0 (TID 0, localhost): org.apache.spark.SparkException: Task failed while 
writing rows.

Here is the code
****************
  val sparkConf = new SparkConf().setAppName("SparkTest")
  val sc = new SparkContext(sparkConf)
  val sqlContext = new SQLContext(sc)
  import sqlContext.implicits._

  val failSpeculativeTask = sqlContext.udf.register("failSpeculativeTask", 
(i: Int, partitionId: Int, attemptNumber: Int) => {
    if (partitionId == 0 && i == 5) {
      if (attemptNumber == 0) {
        Thread.sleep(15000)
        throw new Exception("new exception")
      } else {
        Thread.sleep(10000)
      }
    }
    i
  })

  val df = sc.parallelize((1 to 100), 20).mapPartitions { iter =>
    val context = org.apache.spark.TaskContext.get()
    val partitionId = context.partitionId
    val attemptNumber = context.attemptNumber
    iter.map(i => (i, partitionId, attemptNumber))
  }.toDF("i", "partitionId", "attemptNumber")

  df.select(failSpeculativeTask($"i", $"partitionId", $"attemptNumber"
).as("i"), $"partitionId", $"attemptNumber")
  .write.mode("overwrite").format("csv").save("/output-directory/")

  }

Reply via email to