[ https://issues.apache.org/jira/browse/SPARK-11424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Josh Rosen updated SPARK-11424: ------------------------------- Target Version/s: 1.3.2, 1.4.2, 1.5.2, 1.6.0 > Guard against MAPREDUCE-5918 by ensuring RecordReader is only closed once in > *HadoopRDD > --------------------------------------------------------------------------------------- > > Key: SPARK-11424 > URL: https://issues.apache.org/jira/browse/SPARK-11424 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 1.5.0, 1.5.1 > Reporter: Josh Rosen > Assignee: Josh Rosen > Priority: Critical > > MAPREDUCE-5918 is a bug where an instance of a decompressor ends up getting > placed into a pool multiple times. Since the pool is backed by a list instead > of a set, this can lead to the same decompressor being used in multiple > threads or places at the same time, which is not safe because those > decompressors will overwrite each other's buffers. Sometimes this buffer > sharing will lead to exceptions but other times it will just result in > invalid / garbled results. > That Hadoop bug is fixed in Hadoop 2.7 but is still present in many Hadoop > versions that we wish to support. As a result, I think that we should try to > work around this issue in Spark via defensive programming to prevent > RecordReaders from being closed multiple times. > So far, I've had a hard time coming up with explanations of exactly how > double-close()s occur in practice, but I do have a couple of explanations > that work on paper. > For instance, it looks like https://github.com/apache/spark/pull/7424, added > in 1.5, introduces at least one extremely-rare corner-case path where Spark > could double-close() a LineRecordReader instance in a way that triggers the > bug. Here are the steps involved in the bad execution that I brainstormed up: > * The task has finished reading input, so we call close(): > https://github.com/apache/spark/blob/v1.5.1/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L168. > * While handling the close call and trying to close the reader, > reader.close() throws an exception: > https://github.com/apache/spark/blob/v1.5.1/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L190 > * We don't set reader = null after handling this exception, so the > TaskCompletionListener also ends up calling NewHadoopRDD.close(), which, in > turn, closes the record reader again: > https://github.com/apache/spark/blob/v1.5.1/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L156 > In this hypothetical situation, LineRecordReader.close() could fail with an > exception if its InputStream failed to close: > https://github.com/apache/hadoop/blob/release-1.2.1/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java#L212 > I googled for "Exception in RecordReader.close()" and it looks like it's > possible for a closed Hadoop FileSystem to trigger an error there: > * https://spark-project.atlassian.net/browse/SPARK-757 > * https://issues.apache.org/jira/browse/SPARK-2491 > * > http://apache-spark-user-list.1001560.n3.nabble.com/Any-issues-with-repartition-td13462.html > Looking at https://issues.apache.org/jira/browse/SPARK-3052, it seems like > it's possible to get spurious exceptions there when there is an error reading > from Hadoop. If the Hadoop FileSystem were to get into an error state _right_ > after reading the last record then it looks like we could hit the bug here in > 1.5. Again, this might be really unlikely but we should modify Spark's code > so that we can 100% rule it out. > *TL;DR:* We can rule out one rare but potential cause of stream corruption > via defensive programming. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org