[ https://issues.apache.org/jira/browse/SPARK-24407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Takeshi Yamamuro resolved SPARK-24407. -------------------------------------- Resolution: Invalid > Spark + Parquet + Snappy: Overall compression ratio loses after spark > shuffles data > ----------------------------------------------------------------------------------- > > Key: SPARK-24407 > URL: https://issues.apache.org/jira/browse/SPARK-24407 > Project: Spark > Issue Type: Question > Components: SQL > Affects Versions: 2.1.2 > Reporter: shining > Priority: Major > > Please help me understand how to get better compression ratio with Spark? > Let me describe case: > # I have dataset, let's call it *product* on HDFS which was imported using > Sqoop ImportTool as-parquet-file using codec *snappy*. As result of import, I > have 100 files with total _46 GB_ du, files with diffrrent size (min 11MB, > max 1.5GB, avg ~ 500MB). Total count of records a little bit more than _8 > billions_ with _84 columns_ > # I'm doing simple read/repartition/write with Spark using *snappy* as well > and as result I'm getting: > ~*100 GB* output size with the same files count, same codec, same count and > same columns. > Code snippet: > {{val productDF = spark.read.parquet("/ingest/product/20180202/22-43/") > productDF .repartition(100) > .write.mode(org.apache.spark.sql.SaveMode.Overwrite) .option("compression", > "snappy") > .parquet("/processed/product/20180215/04-37/read_repartition_write/general") > }} > # Using parquet-tools I have looked into random files from both ingest and > processed and they looks as below: > ingest: > {{creator: parquet-mr version 1.5.0-cdh5.11.1 (build ${buildNumber}) extra: > parquet.avro.schema = > {"type":"record","name":"AutoGeneratedSchema","doc":"Sqoop import of > QueryResult","fields" and almost all columns looks like AVAILABLE: OPTIONAL > INT64 R:0 D:1 row group 1: RC:3640100 TS:36454739 OFFSET:4 AVAILABLE: INT64 > SNAPPY DO:0 FPO:172743 SZ:370515/466690/1.26 VC:3640100 > ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 126518400000, max: > 1577692800000, num_nulls: 2541633] }} > processed: > {{creator: parquet-mr version 1.5.0-cdh5.12.0 (build ${buildNumber}) extra: > org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields" > AVAILABLE: OPTIONAL INT64 R:0 D:1 ... row group 1: RC:6660100 TS:243047789 > OFFSET:4 AVAILABLE: INT64 SNAPPY DO:0 FPO:4122795 SZ:4283114/4690840/1.10 > VC:6660100 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE ST:[min: -2209136400000, max: > 10413820800000, num_nulls: 4444993] }} > In other hand, without repartition or using coalesce - size remains close to > ingest data size. > # Going forward, I did following: > ** read dataset and write it back with > {{productDF .write.mode(org.apache.spark.sql.SaveMode.Overwrite) > .option("compression", "none") > .parquet("/processed/product/20180215/04-37/read_repartition_write/nonewithoutshuffle") > }} > ** read dataset, repartition and write it back with > {{productDF .repartition(500) > .write.mode(org.apache.spark.sql.SaveMode.Overwrite) .option("compression", > "none") > .parquet("/processed/product/20180215/04-37/read_repartition_write/nonewithshuffle") > }} > As result: *80 GB* without and *283 GB* with repartition with same # of > output files > 80GB parquet meta example: > {{AVAILABLE: INT64 UNCOMPRESSED DO:0 FPO:456753 SZ:1452623/1452623/1.00 > VC:11000100 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: -1735747200000, max: > 2524550400000, num_nulls: 7929352] }} > 283 GB parquet meta example: > {{AVAILABLE: INT64 UNCOMPRESSED DO:0 FPO:2800387 SZ:2593838/2593838/1.00 > VC:3510100 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: -2209136400000, max: > 10413820800000, num_nulls: 2244255] }} > It seems, that parquet itself (with encoding?) much reduce size of data even > without uncompressed data. How ? :) > I tried to read uncompressed 80GB, repartition and write back - I've got my > 283 GB > * The first question for me is why I'm getting bigger size after spark > repartitioning/shuffle? > * The second is how to efficiently shuffle data in spark to benefit parquet > encoding/compression if there any? > In general, I don't want that my data size growing after spark processing, > even if I didn't change anything. > Also, I failed to find, is there any _configurable compression rate_ for > snappy, e.g. -1 ... -9? As I know, gzip has this, but what is the way to > control this rate in Spark/Parquet writer? > Appreciate for any help! > Thanks! -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org