You might want to look at Twitter Chill: https://github.com/twitter/chill This is where Twitter has the Kryo serializers for this kind of use cases. The one for Thrift: https://github.com/twitter/chill/blob/develop/chill-thrift/src/main/java/com/twitter/chill/thrift/TBaseSerializer.java Spark is using it already: https://github.com/apache/spark/blob/ee11be258251adf900680927ba200bf46512cc04/pom.xml#L320
But the main problem is that you want to encode multiple records at a time when the serializer gets a single key or value at a time. The output is not a single stream since values get routed to a different node based on the key. Unless spark first gets the list of values that goes to a specific node before encoding them (and provides an API to have custom encoding of all values together) I don't think you can implement a Parquet KryoSerializer. I would ask the Spark project whether it's possible to encode all the values that go to the same node together with a custom serializer. Another approach is to reuse dictionary encoding in your row oriented serialization. It would not compress as much as Parquet but may help. You'd need to pass the dictionary around but maybe you know all the possible values ahead of time. (extracting the dictionaries from the Parquet file?) After all your in-memory representation is compact because after dictionary decoding all avro struct that use the same string value point to the same java string instance, so you may be able to use this. I hope this helps Julien On Tue, Apr 7, 2015 at 12:07 PM, Matt Massie <[email protected]> wrote: > I forgot to mention that I’m happy to do the work here, I’m mostly looking > for advice and pointers. > > Also, another interface that might be better for integration is the Spark > Serializer > < > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/serializer/Serializer.scala > > > since the SerializationStream and DeserializationStream have the flush() > and close() functions that are necessary (instead of just read/write). > > > -- > Matt <http://www.linkedin.com/in/mattmassie/> Massie > <http://www.twitter.com/matt_massie> > UC, Berkeley AMPLab <https://twitter.com/amplab> > > On Tue, Apr 7, 2015 at 11:49 AM, Matt Massie <[email protected]> wrote: > > > We are using Apache Parquet and Spark for a genome analysis platform, > > called ADAM <http://bdgenomics.org>, that allows researchers to quickly > > analyze large datasets of DNA, RNA, etc. Parquet has been a core > component > > of the system and we see compression of ~20% compared to specialized > genome > > file formats e.g. compressed BAM. In short, we’re really happy with > Parquet. > > > > We are using Avro Specific classes for almost all the entities in our > > system, so Avro generates Java classes from our schema > > < > https://github.com/bigdatagenomics/bdg-formats/blob/master/src/main/resources/avro/bdg.avdl > >. > > Since the AvroIndexedRecordConverter has dictionary support, our initial > > load from disk to memory is compact. > > > > That’s the good news: compact on-disk and initial in-memory > representation. > > > > Here’s the problem: the Spark shuffle. > > > > In order to integrate Parquet with Spark, we use a KryoRegistrator to > > register Kryo serializers > > < > https://github.com/bigdatagenomics/adam/blob/master/adam-core/src/main/scala/org/bdgenomics/adam/serialization/ADAMKryoRegistrator.scala > > > > for each of our Avro objects (see Kryo Serializer interface > > < > https://github.com/EsotericSoftware/kryo/blob/master/src/com/esotericsoftware/kryo/Serializer.java > >). > > We are serializing each object into record-oriented Avro, which makes our > > intermediate shuffle files much larger than the corresponding > > column-oriented Parquet inputs. These large shuffle files are hurting our > > performance and limiting our scaling for some analysis. > > > > Since the shuffle data is short-lived, there’s no need store meta-data > and > > we have immediate access to schema through each Avro object. Each Avro > > specific class has a SCHEMA$ field which contains the Avro Schema for the > > object. There are utility functions in parquet-avro which can convert > > this Avro schema into Parquet schema. We also don’t need index pages, > only > > the dictionary and data pages. We don’t need predicates or projection > > functionality. Does anyone on this list see a way to create a Parquet > Kryo > > serializer > > < > https://github.com/EsotericSoftware/kryo/blob/master/src/com/esotericsoftware/kryo/Serializer.java > > > > to read/write Avro Specific objects to/from a stream? Emitting > > column-oriented data will understandably incur memory and CPU costs on > the > > map side but it will be worth it to improve our shuffle performance. > > > > This shuffle issue to slowing important research so any advice you have > to > > offer will be appreciated. Thank you. > > > > — > > Matt Massie > > UC Berkeley, AMPLab > > > > >
