[ https://issues.apache.org/jira/browse/SPARK-7263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14521898#comment-14521898 ]
Matt Massie commented on SPARK-7263: ------------------------------------ Our team is using Spark and Parquet for handling large genomics dataset (see http://bdgenomics.org). Our schemas are mostly flat (instead of nested) and many of the fields are highly redundant and compress very well in Parquet. Our Parquet files are typically ~10-20% smaller than data stored in the most popular genome-specific compressed binary format, called BAM. When we first load our Parquet files, Parquet uses dictionaries to appreciably reduce the memory footprint of our objects. For example, one field might have the string name of the chromosome (humans have 23 pairs) so the millions of records we load are all pointing to about a dozen references in a dictionary. However, once this data undergoes a shuffle in Spark, the dictionaries are lost and the data is serialized over the network and stored on-disk as row-oriented records. When the reducers load this record-oriented intermediate data, we no longer have ~23 string references but rather millions of duplicated strings. This memory pressure is swamping out our reducers. By having Parquet integrated into the shuffle, the memory used on the reducers is dramatically decreased (since we don't lose our Parquet dictionaries). While more efficient memory usage was a primary motivator, you're correct that the on-disk sizes for the Parquet column-compressed format will often be smaller than compression alone. This work also allows users to tweak Parquet compression, block/page size, etc to match their needs. I tried to find a way to build this using only Spark shuffle manager APIs but it wasn't possible. As you've noted, I needed to make changes to some of the core code: FileShuffleBlockManager. forMapTask and the ShuffleBlockFetcherIterator. The FileShuffleBlockManager.forMapTask ShuffleGroup creation could be generalized to allow ShuffleGroups to be created by the defined ShuffleManager. For now, it requires custom shuffle implementors to modified the FileShuffleBlockManager method directly. The ShuffleBlockFetcherIterator relies on the defined Spark serializer to fetch and iterate over records. The serializer interface is strictly byte-oriented and assumes data arrives as records. Parquet doesn't have a streaming interface since it needs to seek in a File to find metadata sections. I modified the ShuffleBlockFetcherIterator so that when the ParquetShuffleManager is being used that a local temp block is created and a simple byte-for-byte copy of the remote block is done before using the Avro Parquet reader to iterate over the records. Interestingly, the Parquet shuffle doesn't use or need the defined Spark serializer since it's only copying and reading files directly. There are some interesting future work around using Parquet projection and push-down predicates between Spark stages which could also reduce serialization costs and improve performance (e.g. skipping entire blocks of shuffle data, only deserializing predicate fields, etc). > Add new shuffle manager which stores shuffle blocks in Parquet > -------------------------------------------------------------- > > Key: SPARK-7263 > URL: https://issues.apache.org/jira/browse/SPARK-7263 > Project: Spark > Issue Type: New Feature > Components: Block Manager > Reporter: Matt Massie > > I have a working prototype of this feature that can be viewed at > https://github.com/apache/spark/compare/master...massie:parquet-shuffle?expand=1 > Setting the "spark.shuffle.manager" to "parquet" enables this shuffle manager. > The dictionary support that Parquet provides appreciably reduces the amount of > memory that objects use; however, once Parquet data is shuffled, all the > dictionary information is lost and the column-oriented data is written to > shuffle > blocks in a record-oriented fashion. This shuffle manager addresses this issue > by reading and writing all shuffle blocks in the Parquet format. > If shuffle objects are Avro records, then the Avro $SCHEMA is converted to > Parquet > schema and used directly, otherwise, the Parquet schema is generated via > reflection. > Currently, the only non-Avro keys supported is primitive types. The reflection > code can be improved (or replaced) to support complex records. > The ParquetShufflePair class allows the shuffle key and value to be stored in > Parquet blocks as a single record with a single schema. > This commit adds the following new Spark configuration options: > "spark.shuffle.parquet.compression" - sets the Parquet compression codec > "spark.shuffle.parquet.blocksize" - sets the Parquet block size > "spark.shuffle.parquet.pagesize" - set the Parquet page size > "spark.shuffle.parquet.enabledictionary" - turns dictionary encoding on/off > Parquet does not (and has no plans to) support a streaming API. Metadata > sections > are scattered through a Parquet file making a streaming API difficult. As > such, > the ShuffleBlockFetcherIterator has been modified to fetch the entire contents > of map outputs into temporary blocks before loading the data into the reducer. > Interesting future asides: > o There is no need to define a data serializer (although Spark requires it) > o Parquet support predicate pushdown and projection which could be used at > between shuffle stages to improve performance in the future -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org