Re: Writing to multiple outputs in Spark

2015-08-17 Thread Silas Davis
@Reynold Xin: not really: it only works for Parquet (see partitionBy:
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter),
it requires you to have a DataFrame in the first place (for my use case the
spark sql interface to avro records is more of a hinderance than a help -
since I want to use generated java classes rather than treat avro records
as generic tables (via Rows)), and even if I do have a DataFrame, if I need
to map or mapPartitions I lose that interface and have to create a new
DataFrame from a RDD[Row], which isn't very convenient or efficient.

Has anyone been able to take a look at my gist:
https://gist.github.com/silasdavis/d1d1f1f7ab78249af462? The first 100
lines provides a base class for MutltipleOutputsFormats, then see line 269
for an example of how to use such an OutputFormat:
https://gist.github.com/silasdavis/d1d1f1f7ab78249af462#file-multipleoutputs-scala-L269

@Alex Angelini, the code there would support your use case without
modifying spark (it uses saveAsNewApiHadoopFile and a multiple outputs
wrapper format).

@Nicholas Chammas, I'll post a link to my gist on that ticket.

On Fri, 14 Aug 2015 at 21:10 Reynold Xin r...@databricks.com wrote:

 This is already supported with the new partitioned data sources in
 DataFrame/SQL right?


 On Fri, Aug 14, 2015 at 8:04 AM, Alex Angelini alex.angel...@shopify.com
 wrote:

 Speaking about Shopify's deployment, this would be a really nice to have
 feature.

 We would like to write data to folders with the structure
 `year/month/day` but have had to hold off on that because of the lack
 of support for MultipleOutputs.

 On Fri, Aug 14, 2015 at 10:56 AM, Silas Davis si...@silasdavis.net
 wrote:

 Would it be right to assume that the silence on this topic implies
 others don't really have this issue/desire?

 On Sat, 18 Jul 2015 at 17:24 Silas Davis si...@silasdavis.net wrote:

 *tl;dr hadoop and cascading* *provide ways of writing tuples to
 multiple output files based on key, but the plain RDD interface doesn't
 seem to and it should.*

 I have been looking into ways to write to multiple outputs in Spark. It
 seems like a feature that is somewhat missing from Spark.

 The idea is to partition output and write the elements of an RDD to
 different locations depending based on the key. For example in a pair RDD
 your key may be (language, date, userId) and you would like to write
 separate files to $someBasePath/$language/$date. Then there would be  a
 version of saveAsHadoopDataset that would be able to multiple location
 based on key using the underlying OutputFormat. Perahps it would take a
 pair RDD with keys ($partitionKey, $realKey), so for example ((language,
 date), userId).

 The prior art I have found on this is the following.

 Using SparkSQL:
 The 'partitionBy' method of DataFrameWriter:
 https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.DataFrameWriter

 This only works for parquet at the moment.

 Using Spark/Hadoop:
 This pull request (with the hadoop1 API,) :
 https://github.com/apache/spark/pull/4895/files.

 This uses MultipleTextOutputFormat (which in turn uses
 MultipleOutputFormat) which is part of the old hadoop1 API. It only works
 for text but could be generalised for any underlying OutputFormat by using
 MultipleOutputFormat (but only for hadoop1 - which doesn't support
 ParquetAvroOutputFormat for example)

 This gist (With the hadoop2 API):
 https://gist.github.com/mlehman/df9546f6be2e362bbad2

 This uses MultipleOutputs (available for both the old and new hadoop
 APIs) and extends saveAsNewHadoopDataset to support multiple outputs.
 Should work for any underlying OutputFormat. Probably better implemented by
 extending saveAs[NewAPI]HadoopDataset.

 In Cascading:
 Cascading provides PartititionTap:
 http://docs.cascading.org/cascading/2.5/javadoc/cascading/tap/local/PartitionTap.html
 to do this

 So my questions are: is there a reason why Spark doesn't provide this?
 Does Spark provide similar functionality through some other mechanism? How
 would it be best implemented?

 Since I started composing this message I've had a go at writing an
 wrapper OutputFormat that writes multiple outputs using hadoop
 MultipleOutputs but doesn't require modification of the PairRDDFunctions.
 The principle is similar however. Again it feels slightly hacky to use
 dummy fields for the ReduceContextImpl, but some of this may be a part of
 the impedance mismatch between Spark and plain Hadoop... Here is my
 attempt: https://gist.github.com/silasdavis/d1d1f1f7ab78249af462

 I'd like to see this functionality in Spark somehow but invite
 suggestion of how best to achieve it.

 Thanks,
 Silas






Re: Writing to multiple outputs in Spark

2015-08-14 Thread Abhishek R. Singh
A workaround would be to have multiple passes on the RDD and each pass write 
its own output?

Or in a foreachPartition do it in a single pass (open up multiple files per 
partition to write out)?

-Abhishek-

On Aug 14, 2015, at 7:56 AM, Silas Davis si...@silasdavis.net wrote:

 Would it be right to assume that the silence on this topic implies others 
 don't really have this issue/desire?
 
 On Sat, 18 Jul 2015 at 17:24 Silas Davis si...@silasdavis.net wrote:
 tl;dr hadoop and cascading provide ways of writing tuples to multiple output 
 files based on key, but the plain RDD interface doesn't seem to and it should.
 
 I have been looking into ways to write to multiple outputs in Spark. It seems 
 like a feature that is somewhat missing from Spark.
 
 The idea is to partition output and write the elements of an RDD to different 
 locations depending based on the key. For example in a pair RDD your key may 
 be (language, date, userId) and you would like to write separate files to 
 $someBasePath/$language/$date. Then there would be  a version of 
 saveAsHadoopDataset that would be able to multiple location based on key 
 using the underlying OutputFormat. Perahps it would take a pair RDD with keys 
 ($partitionKey, $realKey), so for example ((language, date), userId).
 
 The prior art I have found on this is the following.
 
 Using SparkSQL:
 The 'partitionBy' method of DataFrameWriter: 
 https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.DataFrameWriter
 
 This only works for parquet at the moment.
 
 Using Spark/Hadoop:
 This pull request (with the hadoop1 API,) : 
 https://github.com/apache/spark/pull/4895/files. 
 
 This uses MultipleTextOutputFormat (which in turn uses MultipleOutputFormat) 
 which is part of the old hadoop1 API. It only works for text but could be 
 generalised for any underlying OutputFormat by using MultipleOutputFormat 
 (but only for hadoop1 - which doesn't support ParquetAvroOutputFormat for 
 example)
 
 This gist (With the hadoop2 API): 
 https://gist.github.com/mlehman/df9546f6be2e362bbad2 
 
 This uses MultipleOutputs (available for both the old and new hadoop APIs) 
 and extends saveAsNewHadoopDataset to support multiple outputs. Should work 
 for any underlying OutputFormat. Probably better implemented by extending 
 saveAs[NewAPI]HadoopDataset.
 
 In Cascading:
 Cascading provides PartititionTap: 
 http://docs.cascading.org/cascading/2.5/javadoc/cascading/tap/local/PartitionTap.html
  to do this
 
 So my questions are: is there a reason why Spark doesn't provide this? Does 
 Spark provide similar functionality through some other mechanism? How would 
 it be best implemented?
 
 Since I started composing this message I've had a go at writing an wrapper 
 OutputFormat that writes multiple outputs using hadoop MultipleOutputs but 
 doesn't require modification of the PairRDDFunctions. The principle is 
 similar however. Again it feels slightly hacky to use dummy fields for the 
 ReduceContextImpl, but some of this may be a part of the impedance mismatch 
 between Spark and plain Hadoop... Here is my attempt: 
 https://gist.github.com/silasdavis/d1d1f1f7ab78249af462 
 
 I'd like to see this functionality in Spark somehow but invite suggestion of 
 how best to achieve it.
 
 Thanks,
 Silas



Re: Writing to multiple outputs in Spark

2015-08-14 Thread Reynold Xin
This is already supported with the new partitioned data sources in
DataFrame/SQL right?


On Fri, Aug 14, 2015 at 8:04 AM, Alex Angelini alex.angel...@shopify.com
wrote:

 Speaking about Shopify's deployment, this would be a really nice to have
 feature.

 We would like to write data to folders with the structure
 `year/month/day` but have had to hold off on that because of the lack
 of support for MultipleOutputs.

 On Fri, Aug 14, 2015 at 10:56 AM, Silas Davis si...@silasdavis.net
 wrote:

 Would it be right to assume that the silence on this topic implies others
 don't really have this issue/desire?

 On Sat, 18 Jul 2015 at 17:24 Silas Davis si...@silasdavis.net wrote:

 *tl;dr hadoop and cascading* *provide ways of writing tuples to
 multiple output files based on key, but the plain RDD interface doesn't
 seem to and it should.*

 I have been looking into ways to write to multiple outputs in Spark. It
 seems like a feature that is somewhat missing from Spark.

 The idea is to partition output and write the elements of an RDD to
 different locations depending based on the key. For example in a pair RDD
 your key may be (language, date, userId) and you would like to write
 separate files to $someBasePath/$language/$date. Then there would be  a
 version of saveAsHadoopDataset that would be able to multiple location
 based on key using the underlying OutputFormat. Perahps it would take a
 pair RDD with keys ($partitionKey, $realKey), so for example ((language,
 date), userId).

 The prior art I have found on this is the following.

 Using SparkSQL:
 The 'partitionBy' method of DataFrameWriter:
 https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.DataFrameWriter

 This only works for parquet at the moment.

 Using Spark/Hadoop:
 This pull request (with the hadoop1 API,) :
 https://github.com/apache/spark/pull/4895/files.

 This uses MultipleTextOutputFormat (which in turn uses
 MultipleOutputFormat) which is part of the old hadoop1 API. It only works
 for text but could be generalised for any underlying OutputFormat by using
 MultipleOutputFormat (but only for hadoop1 - which doesn't support
 ParquetAvroOutputFormat for example)

 This gist (With the hadoop2 API):
 https://gist.github.com/mlehman/df9546f6be2e362bbad2

 This uses MultipleOutputs (available for both the old and new hadoop
 APIs) and extends saveAsNewHadoopDataset to support multiple outputs.
 Should work for any underlying OutputFormat. Probably better implemented by
 extending saveAs[NewAPI]HadoopDataset.

 In Cascading:
 Cascading provides PartititionTap:
 http://docs.cascading.org/cascading/2.5/javadoc/cascading/tap/local/PartitionTap.html
 to do this

 So my questions are: is there a reason why Spark doesn't provide this?
 Does Spark provide similar functionality through some other mechanism? How
 would it be best implemented?

 Since I started composing this message I've had a go at writing an
 wrapper OutputFormat that writes multiple outputs using hadoop
 MultipleOutputs but doesn't require modification of the PairRDDFunctions.
 The principle is similar however. Again it feels slightly hacky to use
 dummy fields for the ReduceContextImpl, but some of this may be a part of
 the impedance mismatch between Spark and plain Hadoop... Here is my
 attempt: https://gist.github.com/silasdavis/d1d1f1f7ab78249af462

 I'd like to see this functionality in Spark somehow but invite
 suggestion of how best to achieve it.

 Thanks,
 Silas





Re: Writing to multiple outputs in Spark

2015-08-14 Thread Nicholas Chammas
See: https://issues.apache.org/jira/browse/SPARK-3533

Feel free to comment there and make a case if you think the issue should be
reopened.

Nick

On Fri, Aug 14, 2015 at 11:11 AM Abhishek R. Singh 
abhis...@tetrationanalytics.com wrote:

 A workaround would be to have multiple passes on the RDD and each pass
 write its own output?

 Or in a foreachPartition do it in a single pass (open up multiple files
 per partition to write out)?

 -Abhishek-

 On Aug 14, 2015, at 7:56 AM, Silas Davis si...@silasdavis.net wrote:

 Would it be right to assume that the silence on this topic implies others
 don't really have this issue/desire?

 On Sat, 18 Jul 2015 at 17:24 Silas Davis si...@silasdavis.net wrote:

 *tl;dr hadoop and cascading* *provide ways of writing tuples to multiple
 output files based on key, but the plain RDD interface doesn't seem to and
 it should.*

 I have been looking into ways to write to multiple outputs in Spark. It
 seems like a feature that is somewhat missing from Spark.

 The idea is to partition output and write the elements of an RDD to
 different locations depending based on the key. For example in a pair RDD
 your key may be (language, date, userId) and you would like to write
 separate files to $someBasePath/$language/$date. Then there would be  a
 version of saveAsHadoopDataset that would be able to multiple location
 based on key using the underlying OutputFormat. Perahps it would take a
 pair RDD with keys ($partitionKey, $realKey), so for example ((language,
 date), userId).

 The prior art I have found on this is the following.

 Using SparkSQL:
 The 'partitionBy' method of DataFrameWriter:
 https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.DataFrameWriter

 This only works for parquet at the moment.

 Using Spark/Hadoop:
 This pull request (with the hadoop1 API,) :
 https://github.com/apache/spark/pull/4895/files.

 This uses MultipleTextOutputFormat (which in turn uses
 MultipleOutputFormat) which is part of the old hadoop1 API. It only works
 for text but could be generalised for any underlying OutputFormat by using
 MultipleOutputFormat (but only for hadoop1 - which doesn't support
 ParquetAvroOutputFormat for example)

 This gist (With the hadoop2 API):
 https://gist.github.com/mlehman/df9546f6be2e362bbad2

 This uses MultipleOutputs (available for both the old and new hadoop
 APIs) and extends saveAsNewHadoopDataset to support multiple outputs.
 Should work for any underlying OutputFormat. Probably better implemented by
 extending saveAs[NewAPI]HadoopDataset.

 In Cascading:
 Cascading provides PartititionTap:
 http://docs.cascading.org/cascading/2.5/javadoc/cascading/tap/local/PartitionTap.html
 to do this

 So my questions are: is there a reason why Spark doesn't provide this?
 Does Spark provide similar functionality through some other mechanism? How
 would it be best implemented?

 Since I started composing this message I've had a go at writing an
 wrapper OutputFormat that writes multiple outputs using hadoop
 MultipleOutputs but doesn't require modification of the PairRDDFunctions.
 The principle is similar however. Again it feels slightly hacky to use
 dummy fields for the ReduceContextImpl, but some of this may be a part of
 the impedance mismatch between Spark and plain Hadoop... Here is my
 attempt: https://gist.github.com/silasdavis/d1d1f1f7ab78249af462

 I'd like to see this functionality in Spark somehow but invite suggestion
 of how best to achieve it.

 Thanks,
 Silas