I did a hackathon project last year to get Spark working more smoothly with Avro, which you can take a look at here:

  https://github.com/rdblue/spark/commit/6e59f4d1

That fixes some cases with Avro records in Kryo where you have the schema on both ends and don't need to serialize the schema with every record, like specific and reflect. It also adds a typed stream, which works more like classic MR and allows you to set the schema for all of the records passing through a shuffle, without going through Kryo. This could save quite a bit of space, though not as much as Parquet would.

Julien is right that the main problem with using Parquet ends up being that you get benefits from encoding multiple records at once.

About the custom dictionaries, though, if you know which fields are getting good dictionary compression, you might be able to define an Avro schema that does the dictionary-encoding work via Avro enum. You could sample the data to find out the appropriate dictionary and generate the Avro schema that way.

rb

On 04/07/2015 01:50 PM, Julien Le Dem wrote:
You might want to look at Twitter Chill: https://github.com/twitter/chill
This is where Twitter has the Kryo serializers for this kind of use cases.
The one for Thrift:
https://github.com/twitter/chill/blob/develop/chill-thrift/src/main/java/com/twitter/chill/thrift/TBaseSerializer.java
Spark is using it already:
https://github.com/apache/spark/blob/ee11be258251adf900680927ba200bf46512cc04/pom.xml#L320

But the main problem is that you want to encode multiple records at a time
when the serializer gets a single key or value at a time.
The output is not a single stream since values get routed to a different
node based on the key.

Unless spark first gets the list of values that goes to a specific node
before encoding them (and provides an API to have custom encoding of all
values together) I don't think you can implement a Parquet KryoSerializer.
I would ask the Spark project whether it's possible to encode all the
values that go to the same node together with a custom serializer.

Another approach is to reuse dictionary encoding in your row oriented
serialization.
It would not compress as much as Parquet but may help. You'd need to pass
the dictionary around but maybe you know all the possible values ahead of
time. (extracting the dictionaries from the Parquet file?)
After all your in-memory representation is compact because after dictionary
decoding all avro struct that use the same string value point to the same
java string instance, so you may be able to use this.

I hope this helps

Julien



On Tue, Apr 7, 2015 at 12:07 PM, Matt Massie <[email protected]> wrote:

I forgot to mention that I’m happy to do the work here, I’m mostly looking
for advice and pointers.

Also, another interface that might be better for integration is the Spark
Serializer
<
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/serializer/Serializer.scala

since the SerializationStream and DeserializationStream have the flush()
and close() functions that are necessary (instead of just read/write).
​

--
Matt <http://www.linkedin.com/in/mattmassie/> Massie
<http://www.twitter.com/matt_massie>
UC, Berkeley AMPLab <https://twitter.com/amplab>

On Tue, Apr 7, 2015 at 11:49 AM, Matt Massie <[email protected]> wrote:

We are using Apache Parquet and Spark for a genome analysis platform,
called ADAM <http://bdgenomics.org>, that allows researchers to quickly
analyze large datasets of DNA, RNA, etc. Parquet has been a core
component
of the system and we see compression of ~20% compared to specialized
genome
file formats e.g. compressed BAM. In short, we’re really happy with
Parquet.

We are using Avro Specific classes for almost all the entities in our
system, so Avro generates Java classes from our schema
<
https://github.com/bigdatagenomics/bdg-formats/blob/master/src/main/resources/avro/bdg.avdl
.
Since the AvroIndexedRecordConverter has dictionary support, our initial
load from disk to memory is compact.

That’s the good news: compact on-disk and initial in-memory
representation.

Here’s the problem: the Spark shuffle.

In order to integrate Parquet with Spark, we use a KryoRegistrator to
register Kryo serializers
<
https://github.com/bigdatagenomics/adam/blob/master/adam-core/src/main/scala/org/bdgenomics/adam/serialization/ADAMKryoRegistrator.scala

for each of our Avro objects (see Kryo Serializer interface
<
https://github.com/EsotericSoftware/kryo/blob/master/src/com/esotericsoftware/kryo/Serializer.java
).
We are serializing each object into record-oriented Avro, which makes our
intermediate shuffle files much larger than the corresponding
column-oriented Parquet inputs. These large shuffle files are hurting our
performance and limiting our scaling for some analysis.

Since the shuffle data is short-lived, there’s no need store meta-data
and
we have immediate access to schema through each Avro object. Each Avro
specific class has a SCHEMA$ field which contains the Avro Schema for the
object. There are utility functions in parquet-avro which can convert
this Avro schema into Parquet schema. We also don’t need index pages,
only
the dictionary and data pages. We don’t need predicates or projection
functionality. Does anyone on this list see a way to create a Parquet
Kryo
serializer
<
https://github.com/EsotericSoftware/kryo/blob/master/src/com/esotericsoftware/kryo/Serializer.java

to read/write Avro Specific objects to/from a stream? Emitting
column-oriented data will understandably incur memory and CPU costs on
the
map side but it will be worth it to improve our shuffle performance.

This shuffle issue to slowing important research so any advice you have
to
offer will be appreciated. Thank you.

—
Matt Massie
UC Berkeley, AMPLab
​





--
Ryan Blue
Software Engineer
Cloudera, Inc.

Reply via email to