Josh Rosen created SPARK-19685:
----------------------------------

             Summary: PipedRDD tasks should not hang on interruption / errors
                 Key: SPARK-19685
                 URL: https://issues.apache.org/jira/browse/SPARK-19685
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 2.1.0, 2.0.0, 1.6.0
            Reporter: Josh Rosen


While looking at WARN and ERROR-level logs from Spark executors, I spotted a 
problem where PipedRDD tasks may continue running after being cancelled or 
after failing. Specifically, I saw many cancelled tasks hanging in the 
following stacks:

{code}
java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
java.io.FilterOutputStream.close(FilterOutputStream.java:158)
java.lang.UNIXProcess.destroy(UNIXProcess.java:445)
java.lang.UNIXProcess.destroy(UNIXProcess.java:478)
org.apache.spark.rdd.PipedRDD$$anon$1.propagateChildException(PipedRDD.scala:203)
org.apache.spark.rdd.PipedRDD$$anon$1.hasNext(PipedRDD.scala:183)
scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
scala.collection.Iterator$class.foreach(Iterator.scala:893)
scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
scala.collection.TraversableOnce$class.fold(TraversableOnce.scala:212)
scala.collection.AbstractIterator.fold(Iterator.scala:1336)
org.apache.spark.rdd.RDD$$anonfun$fold$1$$anonfun$20.apply(RDD.scala:1086)
org.apache.spark.rdd.RDD$$anonfun$fold$1$$anonfun$20.apply(RDD.scala:1086)
org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1980)
org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1980)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
org.apache.spark.scheduler.Task.run(Task.scala:99)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
{code}

and 

{code}
java.io.FileInputStream.readBytes(Native Method)
java.io.FileInputStream.read(FileInputStream.java:255)
java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
java.io.BufferedInputStream.read(BufferedInputStream.java:345)
sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
java.io.InputStreamReader.read(InputStreamReader.java:184)
java.io.BufferedReader.fill(BufferedReader.java:161)
java.io.BufferedReader.readLine(BufferedReader.java:324)
java.io.BufferedReader.readLine(BufferedReader.java:389)
scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:72)
org.apache.spark.rdd.PipedRDD$$anon$1.hasNext(PipedRDD.scala:172)
scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
scala.collection.Iterator$class.foreach(Iterator.scala:893)
scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
scala.collection.TraversableOnce$class.fold(TraversableOnce.scala:212)
scala.collection.AbstractIterator.fold(Iterator.scala:1336)
org.apache.spark.rdd.RDD$$anonfun$fold$1$$anonfun$20.apply(RDD.scala:1086)
org.apache.spark.rdd.RDD$$anonfun$fold$1$$anonfun$20.apply(RDD.scala:1086)
org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1980)
org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1980)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
org.apache.spark.scheduler.Task.run(Task.scala:99)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
{code}

I do not have a minimal reproduction of this issue yet, but I suspect that we 
can make one by having PipedRDD call a process which hangs indefinitely without 
printing any output, then cancel the Spark job with {{interruptOnCancel=true}}. 
If my hunch is right, we should witness the PipedRDD tasks continuing to run 
either because the call to destroy the child process is hanging or because we 
don't check whether the task has been interrupted. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to