[ 
https://issues.apache.org/jira/browse/SPARK-7263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matt Massie updated SPARK-7263:
-------------------------------
    Description: 
I have a working prototype of this feature that can be viewed at
https://github.com/apache/spark/compare/master...massie:parquet-shuffle?expand=1

Setting the "spark.shuffle.manager" to "parquet" enables this shuffle manager.

The dictionary support that Parquet provides appreciably reduces the amount of
memory that objects use; however, once Parquet data is shuffled, all the
dictionary information is lost and the column-oriented data is written to 
shuffle
blocks in a record-oriented fashion. This shuffle manager addresses this issue
by reading and writing all shuffle blocks in the Parquet format.

If shuffle objects are Avro records, then the Avro $SCHEMA is converted to 
Parquet
schema and used directly, otherwise, the Parquet schema is generated via 
reflection.
Currently, the only non-Avro keys supported is primitive types. The reflection
code can be improved (or replaced) to support complex records.

The ParquetShufflePair class allows the shuffle key and value to be stored in
Parquet blocks as a single record with a single schema.

This commit adds the following new Spark configuration options:
"spark.shuffle.parquet.compression" - sets the Parquet compression codec
"spark.shuffle.parquet.blocksize" - sets the Parquet block size
"spark.shuffle.parquet.pagesize" - set the Parquet page size
"spark.shuffle.parquet.enabledictionary" - turns dictionary encoding on/off

Parquet does not (and has no plans to) support a streaming API. Metadata 
sections
are scattered through a Parquet file making a streaming API difficult. As such,
the ShuffleBlockFetcherIterator has been modified to fetch the entire contents
of map outputs into temporary blocks before loading the data into the reducer.

Interesting future asides:
o There is no need to define a data serializer (although Spark requires it)
o Parquet support predicate pushdown and projection which could be used at
  between shuffle stages to improve performance in the future





  was:
Setting the "spark.shuffle.manager" to "parquet" enables this shuffle manager.

The dictionary support that Parquet provides appreciably reduces the amount of
memory that objects use; however, once Parquet data is shuffled, all the
dictionary information is lost and the column-oriented data is written to 
shuffle
blocks in a record-oriented fashion. This shuffle manager addresses this issue
by reading and writing all shuffle blocks in the Parquet format.

If shuffle objects are Avro records, then the Avro $SCHEMA is converted to 
Parquet
schema and used directly, otherwise, the Parquet schema is generated via 
reflection.
Currently, the only non-Avro keys supported is primitive types. The reflection
code can be improved (or replaced) to support complex records.

The ParquetShufflePair class allows the shuffle key and value to be stored in
Parquet blocks as a single record with a single schema.

This commit adds the following new Spark configuration options:
"spark.shuffle.parquet.compression" - sets the Parquet compression codec
"spark.shuffle.parquet.blocksize" - sets the Parquet block size
"spark.shuffle.parquet.pagesize" - set the Parquet page size
"spark.shuffle.parquet.enabledictionary" - turns dictionary encoding on/off

Parquet does not (and has no plans to) support a streaming API. Metadata 
sections
are scattered through a Parquet file making a streaming API difficult. As such,
the ShuffleBlockFetcherIterator has been modified to fetch the entire contents
of map outputs into temporary blocks before loading the data into the reducer.

Interesting future asides:
o There is no need to define a data serializer (although Spark requires it)
o Parquet support predicate pushdown and projection which could be used at
  between shuffle stages to improve performance in the future

I have a working prototype of this feature that can be viewed at
https://github.com/apache/spark/compare/master...massie:parquet-shuffle?expand=1




> Add new shuffle manager which stores shuffle blocks in Parquet
> --------------------------------------------------------------
>
>                 Key: SPARK-7263
>                 URL: https://issues.apache.org/jira/browse/SPARK-7263
>             Project: Spark
>          Issue Type: Improvement
>          Components: Block Manager
>            Reporter: Matt Massie
>
> I have a working prototype of this feature that can be viewed at
> https://github.com/apache/spark/compare/master...massie:parquet-shuffle?expand=1
> Setting the "spark.shuffle.manager" to "parquet" enables this shuffle manager.
> The dictionary support that Parquet provides appreciably reduces the amount of
> memory that objects use; however, once Parquet data is shuffled, all the
> dictionary information is lost and the column-oriented data is written to 
> shuffle
> blocks in a record-oriented fashion. This shuffle manager addresses this issue
> by reading and writing all shuffle blocks in the Parquet format.
> If shuffle objects are Avro records, then the Avro $SCHEMA is converted to 
> Parquet
> schema and used directly, otherwise, the Parquet schema is generated via 
> reflection.
> Currently, the only non-Avro keys supported is primitive types. The reflection
> code can be improved (or replaced) to support complex records.
> The ParquetShufflePair class allows the shuffle key and value to be stored in
> Parquet blocks as a single record with a single schema.
> This commit adds the following new Spark configuration options:
> "spark.shuffle.parquet.compression" - sets the Parquet compression codec
> "spark.shuffle.parquet.blocksize" - sets the Parquet block size
> "spark.shuffle.parquet.pagesize" - set the Parquet page size
> "spark.shuffle.parquet.enabledictionary" - turns dictionary encoding on/off
> Parquet does not (and has no plans to) support a streaming API. Metadata 
> sections
> are scattered through a Parquet file making a streaming API difficult. As 
> such,
> the ShuffleBlockFetcherIterator has been modified to fetch the entire contents
> of map outputs into temporary blocks before loading the data into the reducer.
> Interesting future asides:
> o There is no need to define a data serializer (although Spark requires it)
> o Parquet support predicate pushdown and projection which could be used at
>   between shuffle stages to improve performance in the future



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to