[ https://issues.apache.org/jira/browse/SPARK-43819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthew Tieman updated SPARK-43819: ----------------------------------- Summary: Barrier Executor Stage Not Retried on Task Failure (was: Barrier Executor Stage Not Retried) > Barrier Executor Stage Not Retried on Task Failure > -------------------------------------------------- > > Key: SPARK-43819 > URL: https://issues.apache.org/jira/browse/SPARK-43819 > Project: Spark > Issue Type: Bug > Components: PySpark, Spark Core > Affects Versions: 3.3.2 > Reporter: Matthew Tieman > Priority: Major > > When running a stage using barrier executor, the expectation is that a > failure in a task will result in the stage being retried. However, if an > exception is thrown from a task, the stage is not retried and the job fails. > Running the pyspark code below will cause a single task to fail, failing the > stage without retrying. > {code:java} > def test_func(index: int) -> list: > if index == 0: > raise RuntimeError("Thrown from test func") > return [] > start_rdd = sc.parallelize([i for i in range(10)], 10) > result = start_rdd.barrier().mapPartitionsWithIndex(lambda i, c: test_func(i)) > result.collect(){code} > > This failure is seen running locally via the pyspark shell and on a K8s > cluster. > > Stack trace from local execution: > {noformat} > Traceback (most recent call last): > File "<stdin>", line 1, in <module> > File "/opt/homebrew/anaconda3/lib/python3.9/site-packages/pyspark/rdd.py", > line 1197, in collect > sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) > File > "/opt/homebrew/anaconda3/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", > line 1321, in __call__ > File > "/opt/homebrew/anaconda3/lib/python3.9/site-packages/pyspark/sql/utils.py", > line 190, in deco > return f(*a, **kw) > File > "/opt/homebrew/anaconda3/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", > line 326, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling > z:org.apache.spark.api.python.PythonRDD.collectAndServe. > : org.apache.spark.SparkException: Job aborted due to stage failure: Could > not recover from a failed barrier ResultStage. Most recent failure reason: > Stage failed because barrier task ResultTask(0, 0) finished unsuccessfully. > org.apache.spark.api.python.PythonException: Traceback (most recent call > last): > File > "/opt/homebrew/anaconda3/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", > line 686, in main > process() > File > "/opt/homebrew/anaconda3/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", > line 676, in process > out_iter = func(split_index, iterator) > File "<stdin>", line 1, in <lambda> > File "<stdin>", line 3, in test_func > RuntimeError: Thrown from test func > at > org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559) > at > org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765) > at > org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747) > at > org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at > org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) > at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) > at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49) > at scala.collection.TraversableOnce.to(TraversableOnce.scala:366) > at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364) > at > org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28) > at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358) > at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358) > at > org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28) > at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345) > at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339) > at > org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28) > at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1021) > at > org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2268) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:136) > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:750) > at > org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672) > at > org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608) > at > org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607) > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:2111) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2857) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2293) > at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:406) > at org.apache.spark.rdd.RDD.collect(RDD.scala:1020) > at > org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180) > at > org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at py4j.Gateway.invoke(Gateway.java:282) > at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at py4j.commands.CallCommand.execute(CallCommand.java:79) > at > py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) > at py4j.ClientServerConnection.run(ClientServerConnection.java:106) > at java.lang.Thread.run(Thread.java:750){noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org