[ https://issues.apache.org/jira/browse/SPARK-19685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15876828#comment-15876828 ]
Josh Rosen commented on SPARK-19685: ------------------------------------ By the way, one simple fix here might be to use the "soft-kill then hard-kill after timeout" helper function in {{Utils}} to terminate the child process. > PipedRDD tasks should not hang on interruption / errors > ------------------------------------------------------- > > Key: SPARK-19685 > URL: https://issues.apache.org/jira/browse/SPARK-19685 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 1.6.0, 2.0.0, 2.1.0 > Reporter: Josh Rosen > > While looking at WARN and ERROR-level logs from Spark executors, I spotted a > problem where PipedRDD tasks may continue running after being cancelled or > after failing. Specifically, I saw many cancelled tasks hanging in the > following stacks: > {code} > java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) > java.io.FilterOutputStream.close(FilterOutputStream.java:158) > java.lang.UNIXProcess.destroy(UNIXProcess.java:445) > java.lang.UNIXProcess.destroy(UNIXProcess.java:478) > org.apache.spark.rdd.PipedRDD$$anon$1.propagateChildException(PipedRDD.scala:203) > org.apache.spark.rdd.PipedRDD$$anon$1.hasNext(PipedRDD.scala:183) > scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) > scala.collection.Iterator$class.foreach(Iterator.scala:893) > scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336) > scala.collection.TraversableOnce$class.fold(TraversableOnce.scala:212) > scala.collection.AbstractIterator.fold(Iterator.scala:1336) > org.apache.spark.rdd.RDD$$anonfun$fold$1$$anonfun$20.apply(RDD.scala:1086) > org.apache.spark.rdd.RDD$$anonfun$fold$1$$anonfun$20.apply(RDD.scala:1086) > org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1980) > org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1980) > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > org.apache.spark.scheduler.Task.run(Task.scala:99) > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > {code} > and > {code} > java.io.FileInputStream.readBytes(Native Method) > java.io.FileInputStream.read(FileInputStream.java:255) > java.io.BufferedInputStream.read1(BufferedInputStream.java:284) > java.io.BufferedInputStream.read(BufferedInputStream.java:345) > sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284) > sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326) > sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) > java.io.InputStreamReader.read(InputStreamReader.java:184) > java.io.BufferedReader.fill(BufferedReader.java:161) > java.io.BufferedReader.readLine(BufferedReader.java:324) > java.io.BufferedReader.readLine(BufferedReader.java:389) > scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:72) > org.apache.spark.rdd.PipedRDD$$anon$1.hasNext(PipedRDD.scala:172) > scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) > scala.collection.Iterator$class.foreach(Iterator.scala:893) > scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336) > scala.collection.TraversableOnce$class.fold(TraversableOnce.scala:212) > scala.collection.AbstractIterator.fold(Iterator.scala:1336) > org.apache.spark.rdd.RDD$$anonfun$fold$1$$anonfun$20.apply(RDD.scala:1086) > org.apache.spark.rdd.RDD$$anonfun$fold$1$$anonfun$20.apply(RDD.scala:1086) > org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1980) > org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1980) > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > org.apache.spark.scheduler.Task.run(Task.scala:99) > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > {code} > I do not have a minimal reproduction of this issue yet, but I suspect that we > can make one by having PipedRDD call a process which hangs indefinitely > without printing any output, then cancel the Spark job with > {{interruptOnCancel=true}}. If my hunch is right, we should witness the > PipedRDD tasks continuing to run either because the call to destroy the child > process is hanging or because we don't check whether the task has been > interrupted. -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org