Josh Rosen created SPARK-11424:
----------------------------------

             Summary: Guard against MAPREDUCE-5918 by ensuring RecordReader is 
only closed once in *HadoopRDD
                 Key: SPARK-11424
                 URL: https://issues.apache.org/jira/browse/SPARK-11424
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 1.5.1, 1.5.0
            Reporter: Josh Rosen
            Priority: Critical


MAPREDUCE-5918 is a bug where an instance of a decompressor ends up getting 
placed into a pool multiple times. Since the pool is backed by a list instead 
of a set, this can lead to the same decompressor being used in multiple threads 
or places at the same time, which is not safe because those decompressors will 
overwrite each other's buffers. Sometimes this buffer sharing will lead to 
exceptions but other times it will just result in invalid / garbled results.

That Hadoop bug is fixed in Hadoop 2.7 but is still present in many Hadoop 
versions that we wish to support. As a result, I think that we should try to 
work around this issue in Spark via defensive programming to prevent 
RecordReaders from being closed multiple times.

So far, I've had a hard time coming up with explanations of exactly how 
double-close()s occur in practice, but I do have a couple of explanations that 
work on paper.

For instance, it looks like https://github.com/apache/spark/pull/7424, added in 
1.5, introduces at least one extremely-rare corner-case path where Spark could 
double-close() a LineRecordReader instance in a way that triggers the bug. Here 
are the steps involved in the bad execution that I brainstormed up:

* The task has finished reading input, so we call close(): 
https://github.com/apache/spark/blob/v1.5.1/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L168.
* While handling the close call and trying to close the reader, reader.close() 
throws an exception: 
https://github.com/apache/spark/blob/v1.5.1/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L190
* We don't set reader = null after handling this exception, so the 
TaskCompletionListener also ends up calling NewHadoopRDD.close(), which, in 
turn, closes the record reader again: 
https://github.com/apache/spark/blob/v1.5.1/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L156

In this hypothetical situation, LineRecordReader.close() could fail with an 
exception if its InputStream failed to close: 
https://github.com/apache/hadoop/blob/release-1.2.1/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java#L212
I googled for "Exception in RecordReader.close()" and it looks like it's 
possible for a closed Hadoop FileSystem to trigger an error there:

* https://spark-project.atlassian.net/browse/SPARK-757
* https://issues.apache.org/jira/browse/SPARK-2491
* 
http://apache-spark-user-list.1001560.n3.nabble.com/Any-issues-with-repartition-td13462.html

Looking at https://issues.apache.org/jira/browse/SPARK-3052, it seems like it's 
possible to get spurious exceptions there when there is an error reading from 
Hadoop. If the Hadoop FileSystem were to get into an error state _right_ after 
reading the last record then it looks like we could hit the bug here in 1.5. 
Again, this might be really unlikely but we should modify Spark's code so that 
we can 100% rule it out.

*TL;DR:* We can rule out one rare but potential cause of stream corruption via 
defensive programming.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to