Hi everyone. We have a Beam pipeline running using the portable Spark
runner on EMR. If we use 100% on-demand Core nodes the pipeline finishes
successfully. If we run a mix of on-demand Core nodes and spot Task nodes
the pipeline fails every time with the following error. Does Beam have
resiliency against losing nodes and does it schedule with awareness of Core
vs Task nodes?
Caused by: java.lang.RuntimeException:
org.apache.spark.SparkException: Job aborted due to stage failure: A
shuffle map stage with indeterminate output was failed and retried.
However, Spark cannot rollback the ShuffleMapStage 5 to re-process the
input data, and has to fail this job. Please eliminate the
indeterminacy by checkpointing the RDD before repartition and try
again.
at
org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60)
at
org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77)
at
org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:104)
at
org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:92)
at
org.apache.beam.runners.spark.SparkPipelineRunner.run(SparkPipelineRunner.java:199)
at
org.apache.beam.runners.spark.SparkPipelineRunner.main(SparkPipelineRunner.java:263)
... 5 more
Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: A shuffle map stage with indeterminate output was failed and
retried. However, Spark cannot rollback the ShuffleMapStage 5 to
re-process the input data, and has to fail this job. Please eliminate
the indeterminacy by checkpointing the RDD before repartition and try
again.
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2136)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2124)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2123)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2123)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$13.apply(DAGScheduler.scala:1674)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$13.apply(DAGScheduler.scala:1666)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
Thanks,
Trevor