[ https://issues.apache.org/jira/browse/SPARK-27267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16801366#comment-16801366 ]
Taro L. Saito commented on SPARK-27267: --------------------------------------- Just released snappy-java 1.1.7.3 with this hot fix. https://github.com/xerial/snappy-java > spark 2.4 use 1.1.7.x snappy-java, but its behavior is different from > 1.1.2.x > -------------------------------------------------------------------------------- > > Key: SPARK-27267 > URL: https://issues.apache.org/jira/browse/SPARK-27267 > Project: Spark > Issue Type: Bug > Components: Block Manager, Spark Core > Affects Versions: 2.4.0 > Environment: spark.rdd.compress=true > spark.io.compression.codec =snappy > spark 2.4 in hadoop 2.6 with hive > Reporter: Max Xie > Priority: Minor > > I use pyspark like that > ``` > from pyspark.storagelevel import StorageLevel > df=spark.sql("select * from xzn.person") > df.persist(StorageLevel(False, True, False, False)) > df.count() > ``` > table person is a simple table stored as orc files and some orc files is > empty. When I run the query, it throw the error : > ``` > 19/03/22 21:46:31 INFO MemoryStore:54 - Block rdd_2_1 stored as values in > memory (estimated size 0.0 B, free 1662.6 MB) > 19/03/22 21:46:31 INFO FileScanRDD:54 - Reading File path: > viewfs://name/xzn.db/person/part-00011, range: 0-49, partition values: [empty > row] > 19/03/22 21:46:31 INFO FileScanRDD:54 - Reading File path: > viewfs://name/xzn.db/person/part-00011_copy_1, range: 0-49, partition values: > [empty row] > 19/03/22 21:46:31 INFO FileScanRDD:54 - Reading File path: > viewfs://name/xzn.db/person/part-00012, range: 0-49, partition values: [empty > row] > 19/03/22 21:46:31 INFO FileScanRDD:54 - Reading File path: > viewfs://name/xzn.db/person/part-00012_copy_1, range: 0-49, partition values: > [empty row] > 19/03/22 21:46:31 INFO FileScanRDD:54 - Reading File path: > viewfs://name/xzn.db/person/part-00013, range: 0-49, partition values: [empty > row] > 19/03/22 21:46:31 ERROR Executor:91 - Exception in task 1.0 in stage 0.0 (TID > 1) > org.xerial.snappy.SnappyIOException: [EMPTY_INPUT] Cannot decompress empty > stream > at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:94) > at org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59) > at > org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:164) > at > org.apache.spark.serializer.SerializerManager.wrapForCompression(SerializerManager.scala:163) > at > org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:209) > at > org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:596) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:886) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:286) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) > at org.apache.spark.scheduler.Task.run(Task.scala:121) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > ``` > After I search it, I find that 1.1.7.x snappy-java 's behavior is different > from 1.1.2.x (that spark 2.0.2 use this version). SnappyOutputStream in > 1.1.2.x version always writes a snappy header whether or not to write a > value, but SnappyOutputStream in 1.1.7.x don't generate header if u don't > write value into it, so in spark 2.4 if RDD cache a empty value, memoryStore > will not cache any bytes ( no snappy header ), then it will throw the empty > error. > > Maybe we can change SnappyOutputStream to fix it in 1.1.7.x snappy-java, > there is my SnappyOutputStream method compressInput code > ``` > protected void compressInput() > throws IOException > { > // generate header > if (!headerWritten) { > outputCursor = writeHeader(); > headerWritten = true; > } > if (inputCursor <= 0) { > return; // no need to dump > } > // if (!headerWritten) { > // outputCursor = writeHeader(); > // headerWritten = true; > // } > // Compress and dump the buffer content > if (!hasSufficientOutputBufferFor(inputCursor)) { > dumpOutput(); > } > writeBlockPreemble(); > int compressedSize = Snappy.compress(inputBuffer, 0, inputCursor, > outputBuffer, outputCursor + 4); > // Write compressed data size > writeInt(outputBuffer, outputCursor, compressedSize); > outputCursor += 4 + compressedSize; > inputCursor = 0; > } > ``` > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org