[ 
https://issues.apache.org/jira/browse/SPARK-22982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16330943#comment-16330943
 ] 

Andrew Ash commented on SPARK-22982:
------------------------------------

[~joshrosen] do you have some example stacktraces of what this bug can cause?  
Several of our clusters hit what I think is this problem earlier this month, 
see below for details.

 

For a few days in January (4th through 12th) on our AWS infra, we observed 
massively degraded disk read throughput (down to 33% of previous peaks).  
During this time, we also began observing intermittent exceptions coming from 
Spark at read time of parquet files that a previous Spark job had written.  
When the read throughput recovered on the 12th, we stopped observing the 
exceptions and haven't seen them since.

At first we observed this stacktrace when reading .snappy.parquet files:
{noformat}
java.lang.RuntimeException: java.io.IOException: could not read page Page 
[bytes.size=1048641, valueCount=29945, uncompressedSize=1048641] in col 
[my_column] BINARY
        at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader$1.visit(VectorizedColumnReader.java:493)
        at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader$1.visit(VectorizedColumnReader.java:486)
        at org.apache.parquet.column.page.DataPageV1.accept(DataPageV1.java:96)
        at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readPage(VectorizedColumnReader.java:486)
        at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:157)
        at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:229)
        at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:137)
        at 
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown
 Source)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:398)
        at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
        at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.<init>(ObjectAggregationIterator.scala:80)
        at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:109)
        at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:101)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:341)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: could not read page Page [bytes.size=1048641, 
valueCount=29945, uncompressedSize=1048641] in col [my_column] BINARY
        at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readPageV1(VectorizedColumnReader.java:562)
        at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.access$000(VectorizedColumnReader.java:47)
        at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader$1.visit(VectorizedColumnReader.java:490)
        ... 31 more
Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
        at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98)
        at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
        at org.xerial.snappy.Snappy.uncompress(Snappy.java:547)
        at 
org.apache.parquet.hadoop.codec.SnappyDecompressor.decompress(SnappyDecompressor.java:69)
        at 
org.apache.parquet.hadoop.codec.NonBlockedDecompressorStream.read(NonBlockedDecompressorStream.java:51)
        at java.io.DataInputStream.readFully(DataInputStream.java:195)
        at java.io.DataInputStream.readFully(DataInputStream.java:169)
        at 
org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:253)
        at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readPageV1(VectorizedColumnReader.java:555)
        ... 33 more{noformat}
and saw a similar exception when attempting to read the file with parquet-tools 
instead of Spark, indicating the file itself was corrupted, not the read 
process:
{noformat}
parquet.io.ParquetDecodingException: Can not read value at 11074077 in block 1 
in file 
file:/path/to/part-00852-4f6b3ec3-ae6d-41ff-919b-a2ef4ea3dfa0-c000.snappy.parquet
        at 
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
        at parquet.hadoop.ParquetReader.read(ParquetReader.java:124)
        at parquet.tools.command.CatCommand.execute(CatCommand.java:54)
        at parquet.tools.Main.main(Main.java:219)
Caused by: parquet.io.ParquetDecodingException: could not read page Page 
[bytes.size=1048598, valueCount=83618, uncompressedSize=1048598] in col 
[longitude] BINARY
        at 
parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:568)
        at 
parquet.column.impl.ColumnReaderImpl.access$300(ColumnReaderImpl.java:57)
        at 
parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:516)
        at 
parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:513)
        at parquet.column.page.DataPageV1.accept(DataPageV1.java:96)
        at 
parquet.column.impl.ColumnReaderImpl.readPage(ColumnReaderImpl.java:513)
        at 
parquet.column.impl.ColumnReaderImpl.checkRead(ColumnReaderImpl.java:505)
        at 
parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:607)
        at 
parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:407)
        at 
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:209)
        ... 3 more
Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
        at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
        at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
        at org.xerial.snappy.Snappy.uncompress(Snappy.java:516)
        at 
parquet.hadoop.codec.SnappyDecompressor.decompress(SnappyDecompressor.java:69)
        at 
parquet.hadoop.codec.NonBlockedDecompressorStream.read(NonBlockedDecompressorStream.java:51)
        at java.io.DataInputStream.readFully(DataInputStream.java:195)
        at java.io.DataInputStream.readFully(DataInputStream.java:169)
        at 
parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:204)
        at 
parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:557)
        ... 12 more
Can not read value at 11074077 in block 1 in file 
file:/path/to/part-00852-4f6b3ec3-ae6d-41ff-919b-a2ef4ea3dfa0-c000.snappy.parquet
{noformat}
Thinking it might have something to do with snappy, we set 
{{spark.sql.parquet.compression.codec: gzip}} and then observed this exception, 
again intermittently:
{noformat}
java.lang.IndexOutOfBoundsException
at java.nio.Buffer.checkIndex(Buffer.java:540)
at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:139)
at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readUnsignedVarInt(VectorizedRleValuesReader.java:532)
at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readNextGroup(VectorizedRleValuesReader.java:588)
at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readIntegers(VectorizedRleValuesReader.java:467)
at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readIntegers(VectorizedRleValuesReader.java:438)
at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:163)
at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:229)
at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:137)
at 
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:398)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:341)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748){noformat}
 

This occurred in clusters with Spark versions ranging from a few weeks ago on 
the apache master branch to all the way back to Spark 1.6.1.  All in AWS infra, 
we did not observe this outside of AWS.  One cluster saw failures in about 1% 
of builds.

My hypothesis is that the Spectre/Meltdown patches on AWS, which in an early 
iteration caused large I/O performance degradations, caused Spark to hold file 
handles open for longer than previously, and subsequently hit this bug more 
frequently than before.  We hadn't seen the exceptions previously, and when 
another silent AWS patch went out around Jan 12th that fixed I/O performance, 
we stopped seeing the exceptions.

Do these observations match some of the observations your team observed while 
diving into this bug?  Or am I totally off-base and latching onto an unrelated 
issue hoping it's the same as mine?

> Remove unsafe asynchronous close() call from FileDownloadChannel
> ----------------------------------------------------------------
>
>                 Key: SPARK-22982
>                 URL: https://issues.apache.org/jira/browse/SPARK-22982
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.6.0, 2.0.0, 2.1.0, 2.2.0
>            Reporter: Josh Rosen
>            Assignee: Josh Rosen
>            Priority: Blocker
>              Labels: correctness
>             Fix For: 2.2.2, 2.3.0
>
>
> Spark's Netty-based file transfer code contains an asynchronous IO bug which 
> may lead to incorrect query results.
> At a high-level, the problem is that an unsafe asynchronous `close()` of a 
> pipe's source channel creates a race condition where file transfer code 
> closes a file descriptor then attempts to read from it. If the closed file 
> descriptor's number has been reused by an `open()` call then this invalid 
> read may cause unrelated file operations to return incorrect results due to 
> reading different data than intended.
> I have a small, surgical fix for this bug and will submit a PR with more 
> description on the specific race condition / underlying bug.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to