I forgot to mention that I’m happy to do the work here, I’m mostly looking
for advice and pointers.
Also, another interface that might be better for integration is the Spark
Serializer
<
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
since the SerializationStream and DeserializationStream have the flush()
and close() functions that are necessary (instead of just read/write).
--
Matt <http://www.linkedin.com/in/mattmassie/> Massie
<http://www.twitter.com/matt_massie>
UC, Berkeley AMPLab <https://twitter.com/amplab>
On Tue, Apr 7, 2015 at 11:49 AM, Matt Massie <[email protected]> wrote:
We are using Apache Parquet and Spark for a genome analysis platform,
called ADAM <http://bdgenomics.org>, that allows researchers to quickly
analyze large datasets of DNA, RNA, etc. Parquet has been a core
component
of the system and we see compression of ~20% compared to specialized
genome
file formats e.g. compressed BAM. In short, we’re really happy with
Parquet.
We are using Avro Specific classes for almost all the entities in our
system, so Avro generates Java classes from our schema
<
https://github.com/bigdatagenomics/bdg-formats/blob/master/src/main/resources/avro/bdg.avdl
.
Since the AvroIndexedRecordConverter has dictionary support, our initial
load from disk to memory is compact.
That’s the good news: compact on-disk and initial in-memory
representation.
Here’s the problem: the Spark shuffle.
In order to integrate Parquet with Spark, we use a KryoRegistrator to
register Kryo serializers
<
https://github.com/bigdatagenomics/adam/blob/master/adam-core/src/main/scala/org/bdgenomics/adam/serialization/ADAMKryoRegistrator.scala
for each of our Avro objects (see Kryo Serializer interface
<
https://github.com/EsotericSoftware/kryo/blob/master/src/com/esotericsoftware/kryo/Serializer.java
).
We are serializing each object into record-oriented Avro, which makes our
intermediate shuffle files much larger than the corresponding
column-oriented Parquet inputs. These large shuffle files are hurting our
performance and limiting our scaling for some analysis.
Since the shuffle data is short-lived, there’s no need store meta-data
and
we have immediate access to schema through each Avro object. Each Avro
specific class has a SCHEMA$ field which contains the Avro Schema for the
object. There are utility functions in parquet-avro which can convert
this Avro schema into Parquet schema. We also don’t need index pages,
only
the dictionary and data pages. We don’t need predicates or projection
functionality. Does anyone on this list see a way to create a Parquet
Kryo
serializer
<
https://github.com/EsotericSoftware/kryo/blob/master/src/com/esotericsoftware/kryo/Serializer.java
to read/write Avro Specific objects to/from a stream? Emitting
column-oriented data will understandably incur memory and CPU costs on
the
map side but it will be worth it to improve our shuffle performance.
This shuffle issue to slowing important research so any advice you have
to
offer will be appreciated. Thank you.
—
Matt Massie
UC Berkeley, AMPLab