*tl;dr hadoop and cascading* *provide ways of writing tuples to multiple
output files based on key, but the plain RDD interface doesn't seem to and
it should.*

I have been looking into ways to write to multiple outputs in Spark. It
seems like a feature that is somewhat missing from Spark.

The idea is to partition output and write the elements of an RDD to
different locations depending based on the key. For example in a pair RDD
your key may be (language, date, userId) and you would like to write
separate files to $someBasePath/$language/$date. Then there would be  a
version of saveAsHadoopDataset that would be able to multiple location
based on key using the underlying OutputFormat. Perahps it would take a
pair RDD with keys ($partitionKey, $realKey), so for example ((language,
date), userId).

The prior art I have found on this is the following.

Using SparkSQL:
The 'partitionBy' method of DataFrameWriter:
https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.DataFrameWriter

This only works for parquet at the moment.

Using Spark/Hadoop:
This pull request (with the hadoop1 API,) :
https://github.com/apache/spark/pull/4895/files.

This uses MultipleTextOutputFormat (which in turn uses
MultipleOutputFormat) which is part of the old hadoop1 API. It only works
for text but could be generalised for any underlying OutputFormat by using
MultipleOutputFormat (but only for hadoop1 - which doesn't support
ParquetAvroOutputFormat for example)

This gist (With the hadoop2 API):
https://gist.github.com/mlehman/df9546f6be2e362bbad2

This uses MultipleOutputs (available for both the old and new hadoop APIs)
and extends saveAsNewHadoopDataset to support multiple outputs. Should work
for any underlying OutputFormat. Probably better implemented by extending
saveAs[NewAPI]HadoopDataset.

In Cascading:
Cascading provides PartititionTap:
http://docs.cascading.org/cascading/2.5/javadoc/cascading/tap/local/PartitionTap.html
to do this

So my questions are: is there a reason why Spark doesn't provide this? Does
Spark provide similar functionality through some other mechanism? How would
it be best implemented?

Since I started composing this message I've had a go at writing an wrapper
OutputFormat that writes multiple outputs using hadoop MultipleOutputs but
doesn't require modification of the PairRDDFunctions. The principle is
similar however. Again it feels slightly hacky to use dummy fields for the
ReduceContextImpl, but some of this may be a part of the impedance mismatch
between Spark and plain Hadoop... Here is my attempt:
https://gist.github.com/silasdavis/d1d1f1f7ab78249af462

I'd like to see this functionality in Spark somehow but invite suggestion
of how best to achieve it.

Thanks,
Silas

Reply via email to