[ https://issues.apache.org/jira/browse/SPARK-22982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16330943#comment-16330943 ]
Andrew Ash commented on SPARK-22982: ------------------------------------ [~joshrosen] do you have some example stacktraces of what this bug can cause? Several of our clusters hit what I think is this problem earlier this month, see below for details. For a few days in January (4th through 12th) on our AWS infra, we observed massively degraded disk read throughput (down to 33% of previous peaks). During this time, we also began observing intermittent exceptions coming from Spark at read time of parquet files that a previous Spark job had written. When the read throughput recovered on the 12th, we stopped observing the exceptions and haven't seen them since. At first we observed this stacktrace when reading .snappy.parquet files: {noformat} java.lang.RuntimeException: java.io.IOException: could not read page Page [bytes.size=1048641, valueCount=29945, uncompressedSize=1048641] in col [my_column] BINARY at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader$1.visit(VectorizedColumnReader.java:493) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader$1.visit(VectorizedColumnReader.java:486) at org.apache.parquet.column.page.DataPageV1.accept(DataPageV1.java:96) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readPage(VectorizedColumnReader.java:486) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:157) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:229) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:137) at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:398) at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191) at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.<init>(ObjectAggregationIterator.scala:80) at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:109) at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:101) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:341) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: could not read page Page [bytes.size=1048641, valueCount=29945, uncompressedSize=1048641] in col [my_column] BINARY at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readPageV1(VectorizedColumnReader.java:562) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.access$000(VectorizedColumnReader.java:47) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader$1.visit(VectorizedColumnReader.java:490) ... 31 more Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5) at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98) at org.xerial.snappy.SnappyNative.rawUncompress(Native Method) at org.xerial.snappy.Snappy.uncompress(Snappy.java:547) at org.apache.parquet.hadoop.codec.SnappyDecompressor.decompress(SnappyDecompressor.java:69) at org.apache.parquet.hadoop.codec.NonBlockedDecompressorStream.read(NonBlockedDecompressorStream.java:51) at java.io.DataInputStream.readFully(DataInputStream.java:195) at java.io.DataInputStream.readFully(DataInputStream.java:169) at org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:253) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readPageV1(VectorizedColumnReader.java:555) ... 33 more{noformat} and saw a similar exception when attempting to read the file with parquet-tools instead of Spark, indicating the file itself was corrupted, not the read process: {noformat} parquet.io.ParquetDecodingException: Can not read value at 11074077 in block 1 in file file:/path/to/part-00852-4f6b3ec3-ae6d-41ff-919b-a2ef4ea3dfa0-c000.snappy.parquet at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228) at parquet.hadoop.ParquetReader.read(ParquetReader.java:124) at parquet.tools.command.CatCommand.execute(CatCommand.java:54) at parquet.tools.Main.main(Main.java:219) Caused by: parquet.io.ParquetDecodingException: could not read page Page [bytes.size=1048598, valueCount=83618, uncompressedSize=1048598] in col [longitude] BINARY at parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:568) at parquet.column.impl.ColumnReaderImpl.access$300(ColumnReaderImpl.java:57) at parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:516) at parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:513) at parquet.column.page.DataPageV1.accept(DataPageV1.java:96) at parquet.column.impl.ColumnReaderImpl.readPage(ColumnReaderImpl.java:513) at parquet.column.impl.ColumnReaderImpl.checkRead(ColumnReaderImpl.java:505) at parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:607) at parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:407) at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:209) ... 3 more Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5) at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84) at org.xerial.snappy.SnappyNative.rawUncompress(Native Method) at org.xerial.snappy.Snappy.uncompress(Snappy.java:516) at parquet.hadoop.codec.SnappyDecompressor.decompress(SnappyDecompressor.java:69) at parquet.hadoop.codec.NonBlockedDecompressorStream.read(NonBlockedDecompressorStream.java:51) at java.io.DataInputStream.readFully(DataInputStream.java:195) at java.io.DataInputStream.readFully(DataInputStream.java:169) at parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:204) at parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:557) ... 12 more Can not read value at 11074077 in block 1 in file file:/path/to/part-00852-4f6b3ec3-ae6d-41ff-919b-a2ef4ea3dfa0-c000.snappy.parquet {noformat} Thinking it might have something to do with snappy, we set {{spark.sql.parquet.compression.codec: gzip}} and then observed this exception, again intermittently: {noformat} java.lang.IndexOutOfBoundsException at java.nio.Buffer.checkIndex(Buffer.java:540) at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:139) at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readUnsignedVarInt(VectorizedRleValuesReader.java:532) at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readNextGroup(VectorizedRleValuesReader.java:588) at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readIntegers(VectorizedRleValuesReader.java:467) at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readIntegers(VectorizedRleValuesReader.java:438) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:163) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:229) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:137) at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:398) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:341) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748){noformat} This occurred in clusters with Spark versions ranging from a few weeks ago on the apache master branch to all the way back to Spark 1.6.1. All in AWS infra, we did not observe this outside of AWS. One cluster saw failures in about 1% of builds. My hypothesis is that the Spectre/Meltdown patches on AWS, which in an early iteration caused large I/O performance degradations, caused Spark to hold file handles open for longer than previously, and subsequently hit this bug more frequently than before. We hadn't seen the exceptions previously, and when another silent AWS patch went out around Jan 12th that fixed I/O performance, we stopped seeing the exceptions. Do these observations match some of the observations your team observed while diving into this bug? Or am I totally off-base and latching onto an unrelated issue hoping it's the same as mine? > Remove unsafe asynchronous close() call from FileDownloadChannel > ---------------------------------------------------------------- > > Key: SPARK-22982 > URL: https://issues.apache.org/jira/browse/SPARK-22982 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 1.6.0, 2.0.0, 2.1.0, 2.2.0 > Reporter: Josh Rosen > Assignee: Josh Rosen > Priority: Blocker > Labels: correctness > Fix For: 2.2.2, 2.3.0 > > > Spark's Netty-based file transfer code contains an asynchronous IO bug which > may lead to incorrect query results. > At a high-level, the problem is that an unsafe asynchronous `close()` of a > pipe's source channel creates a race condition where file transfer code > closes a file descriptor then attempts to read from it. If the closed file > descriptor's number has been reused by an `open()` call then this invalid > read may cause unrelated file operations to return incorrect results due to > reading different data than intended. > I have a small, surgical fix for this bug and will submit a PR with more > description on the specific race condition / underlying bug. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org