Thanks for the ideas, Julien. I appreciate it. I suspected that I would need to direct my effort into a Parquet-based Spark shuffle but wanted to reach out on the Parquet list first to make sure that the simpler Parquet serialization approach wouldn't work.
As far as I can tell, there isn't an external API for extracting dictionaries from Parquet files. Correct? -Matt On Tue, Apr 7, 2015 at 1:50 PM, Julien Le Dem <[email protected]> wrote: > You might want to look at Twitter Chill: https://github.com/twitter/chill > This is where Twitter has the Kryo serializers for this kind of use cases. > The one for Thrift: > > https://github.com/twitter/chill/blob/develop/chill-thrift/src/main/java/com/twitter/chill/thrift/TBaseSerializer.java > Spark is using it already: > > https://github.com/apache/spark/blob/ee11be258251adf900680927ba200bf46512cc04/pom.xml#L320 > > But the main problem is that you want to encode multiple records at a time > when the serializer gets a single key or value at a time. > The output is not a single stream since values get routed to a different > node based on the key. > > Unless spark first gets the list of values that goes to a specific node > before encoding them (and provides an API to have custom encoding of all > values together) I don't think you can implement a Parquet KryoSerializer. > I would ask the Spark project whether it's possible to encode all the > values that go to the same node together with a custom serializer. > > Another approach is to reuse dictionary encoding in your row oriented > serialization. > It would not compress as much as Parquet but may help. You'd need to pass > the dictionary around but maybe you know all the possible values ahead of > time. (extracting the dictionaries from the Parquet file?) > After all your in-memory representation is compact because after dictionary > decoding all avro struct that use the same string value point to the same > java string instance, so you may be able to use this. > > I hope this helps > > Julien > > > > On Tue, Apr 7, 2015 at 12:07 PM, Matt Massie <[email protected]> wrote: > > > I forgot to mention that I’m happy to do the work here, I’m mostly > looking > > for advice and pointers. > > > > Also, another interface that might be better for integration is the Spark > > Serializer > > < > > > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/serializer/Serializer.scala > > > > > since the SerializationStream and DeserializationStream have the flush() > > and close() functions that are necessary (instead of just read/write). > > > > > > -- > > Matt <http://www.linkedin.com/in/mattmassie/> Massie > > <http://www.twitter.com/matt_massie> > > UC, Berkeley AMPLab <https://twitter.com/amplab> > > > > On Tue, Apr 7, 2015 at 11:49 AM, Matt Massie <[email protected]> > wrote: > > > > > We are using Apache Parquet and Spark for a genome analysis platform, > > > called ADAM <http://bdgenomics.org>, that allows researchers to > quickly > > > analyze large datasets of DNA, RNA, etc. Parquet has been a core > > component > > > of the system and we see compression of ~20% compared to specialized > > genome > > > file formats e.g. compressed BAM. In short, we’re really happy with > > Parquet. > > > > > > We are using Avro Specific classes for almost all the entities in our > > > system, so Avro generates Java classes from our schema > > > < > > > https://github.com/bigdatagenomics/bdg-formats/blob/master/src/main/resources/avro/bdg.avdl > > >. > > > Since the AvroIndexedRecordConverter has dictionary support, our > initial > > > load from disk to memory is compact. > > > > > > That’s the good news: compact on-disk and initial in-memory > > representation. > > > > > > Here’s the problem: the Spark shuffle. > > > > > > In order to integrate Parquet with Spark, we use a KryoRegistrator to > > > register Kryo serializers > > > < > > > https://github.com/bigdatagenomics/adam/blob/master/adam-core/src/main/scala/org/bdgenomics/adam/serialization/ADAMKryoRegistrator.scala > > > > > > for each of our Avro objects (see Kryo Serializer interface > > > < > > > https://github.com/EsotericSoftware/kryo/blob/master/src/com/esotericsoftware/kryo/Serializer.java > > >). > > > We are serializing each object into record-oriented Avro, which makes > our > > > intermediate shuffle files much larger than the corresponding > > > column-oriented Parquet inputs. These large shuffle files are hurting > our > > > performance and limiting our scaling for some analysis. > > > > > > Since the shuffle data is short-lived, there’s no need store meta-data > > and > > > we have immediate access to schema through each Avro object. Each Avro > > > specific class has a SCHEMA$ field which contains the Avro Schema for > the > > > object. There are utility functions in parquet-avro which can convert > > > this Avro schema into Parquet schema. We also don’t need index pages, > > only > > > the dictionary and data pages. We don’t need predicates or projection > > > functionality. Does anyone on this list see a way to create a Parquet > > Kryo > > > serializer > > > < > > > https://github.com/EsotericSoftware/kryo/blob/master/src/com/esotericsoftware/kryo/Serializer.java > > > > > > to read/write Avro Specific objects to/from a stream? Emitting > > > column-oriented data will understandably incur memory and CPU costs on > > the > > > map side but it will be worth it to improve our shuffle performance. > > > > > > This shuffle issue to slowing important research so any advice you have > > to > > > offer will be appreciated. Thank you. > > > > > > — > > > Matt Massie > > > UC Berkeley, AMPLab > > > > > > > > >
