[ 
https://issues.apache.org/jira/browse/SPARK-7263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14619236#comment-14619236
 ] 

Matt Massie commented on SPARK-7263:
------------------------------------

I'll start by stating the problem that I'm trying to solve: GC pressure and 
memory use. When Parquet is first loaded in Spark, dictionaries are used to 
keep the number of objects created low. However, on shuffle, all the column 
meta data is lost (including the dictionaries) because the Spark {{Serializer}} 
interface reads/writes objects in a record-oriented layout on disk w/o any 
metadata. When the shuffle data is reloaded using the Spark Serializer, memory 
use and GC pressure increases dramatically because of e.g. duplicated strings. 

The {{SparkSqlSerializer}} and {{SparkSqlSerializer2}} Serializer 
implementations do not solve this problem. While they do use the available 
schema information to keep the data as compact as possible, they are still 
limited by the Spark {{Serializer}} interface. This Parquet shuffle manager 
by-passes the Spark {{Serialization}} interface altogether and just operates on 
{{File}} s, preserving all the metadata along the shuffle path, resulting in 
smaller shuffle files and lower memory usage and GC pressure.

I was originally considering trying to support reflection for objects that 
didn't have explicit schemas. In that case, it did make a sense to leverage the 
{{ScalaReflection}} => {{CatalystTypeConverters}} => {{CatalystConverter}} 
pathway. After writing a version of the Parquet shuffle that used it, I decided 
against supporting reflection. As it's written now, the Parquet shuffle is only 
in effect for objects that provide explicit schema information. All other 
objects are shuffled using the defined fallback shuffle manager. This approach 
makes the code cleaner, easier to maintain and less error prone.

There no real benefit, for the problem that I'm trying to solve, to convert 
Avro/Parquet -> Catalyst -> Avro/Parquet, There is also an issue for the schema 
round-trip in Spark SQL. If you load an Avro/Parquet file in Spark SQL, and 
then save it using Spark SQL, you cannot reload it using the AvroParquet 
reader, because Spark SQL renames the root {{MessageType}} to "root", 
effectively stripping the type information. This is a minor point though since 
this bug could be fixed in Spark SQL.

I'd like to hear more about the specific maintenance concerns that you have.

> Add new shuffle manager which stores shuffle blocks in Parquet
> --------------------------------------------------------------
>
>                 Key: SPARK-7263
>                 URL: https://issues.apache.org/jira/browse/SPARK-7263
>             Project: Spark
>          Issue Type: New Feature
>          Components: Block Manager
>            Reporter: Matt Massie
>
> I have a working prototype of this feature that can be viewed at
> https://github.com/apache/spark/compare/master...massie:parquet-shuffle?expand=1
> Setting the "spark.shuffle.manager" to "parquet" enables this shuffle manager.
> The dictionary support that Parquet provides appreciably reduces the amount of
> memory that objects use; however, once Parquet data is shuffled, all the
> dictionary information is lost and the column-oriented data is written to 
> shuffle
> blocks in a record-oriented fashion. This shuffle manager addresses this issue
> by reading and writing all shuffle blocks in the Parquet format.
> If shuffle objects are Avro records, then the Avro $SCHEMA is converted to 
> Parquet
> schema and used directly, otherwise, the Parquet schema is generated via 
> reflection.
> Currently, the only non-Avro keys supported is primitive types. The reflection
> code can be improved (or replaced) to support complex records.
> The ParquetShufflePair class allows the shuffle key and value to be stored in
> Parquet blocks as a single record with a single schema.
> This commit adds the following new Spark configuration options:
> "spark.shuffle.parquet.compression" - sets the Parquet compression codec
> "spark.shuffle.parquet.blocksize" - sets the Parquet block size
> "spark.shuffle.parquet.pagesize" - set the Parquet page size
> "spark.shuffle.parquet.enabledictionary" - turns dictionary encoding on/off
> Parquet does not (and has no plans to) support a streaming API. Metadata 
> sections
> are scattered through a Parquet file making a streaming API difficult. As 
> such,
> the ShuffleBlockFetcherIterator has been modified to fetch the entire contents
> of map outputs into temporary blocks before loading the data into the reducer.
> Interesting future asides:
> o There is no need to define a data serializer (although Spark requires it)
> o Parquet support predicate pushdown and projection which could be used at
>   between shuffle stages to improve performance in the future



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to