Re: Writing to multiple outputs in Spark
@Reynold Xin: not really: it only works for Parquet (see partitionBy: https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter), it requires you to have a DataFrame in the first place (for my use case the spark sql interface to avro records is more of a hinderance than a help - since I want to use generated java classes rather than treat avro records as generic tables (via Rows)), and even if I do have a DataFrame, if I need to map or mapPartitions I lose that interface and have to create a new DataFrame from a RDD[Row], which isn't very convenient or efficient. Has anyone been able to take a look at my gist: https://gist.github.com/silasdavis/d1d1f1f7ab78249af462? The first 100 lines provides a base class for MutltipleOutputsFormats, then see line 269 for an example of how to use such an OutputFormat: https://gist.github.com/silasdavis/d1d1f1f7ab78249af462#file-multipleoutputs-scala-L269 @Alex Angelini, the code there would support your use case without modifying spark (it uses saveAsNewApiHadoopFile and a multiple outputs wrapper format). @Nicholas Chammas, I'll post a link to my gist on that ticket. On Fri, 14 Aug 2015 at 21:10 Reynold Xin r...@databricks.com wrote: This is already supported with the new partitioned data sources in DataFrame/SQL right? On Fri, Aug 14, 2015 at 8:04 AM, Alex Angelini alex.angel...@shopify.com wrote: Speaking about Shopify's deployment, this would be a really nice to have feature. We would like to write data to folders with the structure `year/month/day` but have had to hold off on that because of the lack of support for MultipleOutputs. On Fri, Aug 14, 2015 at 10:56 AM, Silas Davis si...@silasdavis.net wrote: Would it be right to assume that the silence on this topic implies others don't really have this issue/desire? On Sat, 18 Jul 2015 at 17:24 Silas Davis si...@silasdavis.net wrote: *tl;dr hadoop and cascading* *provide ways of writing tuples to multiple output files based on key, but the plain RDD interface doesn't seem to and it should.* I have been looking into ways to write to multiple outputs in Spark. It seems like a feature that is somewhat missing from Spark. The idea is to partition output and write the elements of an RDD to different locations depending based on the key. For example in a pair RDD your key may be (language, date, userId) and you would like to write separate files to $someBasePath/$language/$date. Then there would be a version of saveAsHadoopDataset that would be able to multiple location based on key using the underlying OutputFormat. Perahps it would take a pair RDD with keys ($partitionKey, $realKey), so for example ((language, date), userId). The prior art I have found on this is the following. Using SparkSQL: The 'partitionBy' method of DataFrameWriter: https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.DataFrameWriter This only works for parquet at the moment. Using Spark/Hadoop: This pull request (with the hadoop1 API,) : https://github.com/apache/spark/pull/4895/files. This uses MultipleTextOutputFormat (which in turn uses MultipleOutputFormat) which is part of the old hadoop1 API. It only works for text but could be generalised for any underlying OutputFormat by using MultipleOutputFormat (but only for hadoop1 - which doesn't support ParquetAvroOutputFormat for example) This gist (With the hadoop2 API): https://gist.github.com/mlehman/df9546f6be2e362bbad2 This uses MultipleOutputs (available for both the old and new hadoop APIs) and extends saveAsNewHadoopDataset to support multiple outputs. Should work for any underlying OutputFormat. Probably better implemented by extending saveAs[NewAPI]HadoopDataset. In Cascading: Cascading provides PartititionTap: http://docs.cascading.org/cascading/2.5/javadoc/cascading/tap/local/PartitionTap.html to do this So my questions are: is there a reason why Spark doesn't provide this? Does Spark provide similar functionality through some other mechanism? How would it be best implemented? Since I started composing this message I've had a go at writing an wrapper OutputFormat that writes multiple outputs using hadoop MultipleOutputs but doesn't require modification of the PairRDDFunctions. The principle is similar however. Again it feels slightly hacky to use dummy fields for the ReduceContextImpl, but some of this may be a part of the impedance mismatch between Spark and plain Hadoop... Here is my attempt: https://gist.github.com/silasdavis/d1d1f1f7ab78249af462 I'd like to see this functionality in Spark somehow but invite suggestion of how best to achieve it. Thanks, Silas
Re: Writing to multiple outputs in Spark
A workaround would be to have multiple passes on the RDD and each pass write its own output? Or in a foreachPartition do it in a single pass (open up multiple files per partition to write out)? -Abhishek- On Aug 14, 2015, at 7:56 AM, Silas Davis si...@silasdavis.net wrote: Would it be right to assume that the silence on this topic implies others don't really have this issue/desire? On Sat, 18 Jul 2015 at 17:24 Silas Davis si...@silasdavis.net wrote: tl;dr hadoop and cascading provide ways of writing tuples to multiple output files based on key, but the plain RDD interface doesn't seem to and it should. I have been looking into ways to write to multiple outputs in Spark. It seems like a feature that is somewhat missing from Spark. The idea is to partition output and write the elements of an RDD to different locations depending based on the key. For example in a pair RDD your key may be (language, date, userId) and you would like to write separate files to $someBasePath/$language/$date. Then there would be a version of saveAsHadoopDataset that would be able to multiple location based on key using the underlying OutputFormat. Perahps it would take a pair RDD with keys ($partitionKey, $realKey), so for example ((language, date), userId). The prior art I have found on this is the following. Using SparkSQL: The 'partitionBy' method of DataFrameWriter: https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.DataFrameWriter This only works for parquet at the moment. Using Spark/Hadoop: This pull request (with the hadoop1 API,) : https://github.com/apache/spark/pull/4895/files. This uses MultipleTextOutputFormat (which in turn uses MultipleOutputFormat) which is part of the old hadoop1 API. It only works for text but could be generalised for any underlying OutputFormat by using MultipleOutputFormat (but only for hadoop1 - which doesn't support ParquetAvroOutputFormat for example) This gist (With the hadoop2 API): https://gist.github.com/mlehman/df9546f6be2e362bbad2 This uses MultipleOutputs (available for both the old and new hadoop APIs) and extends saveAsNewHadoopDataset to support multiple outputs. Should work for any underlying OutputFormat. Probably better implemented by extending saveAs[NewAPI]HadoopDataset. In Cascading: Cascading provides PartititionTap: http://docs.cascading.org/cascading/2.5/javadoc/cascading/tap/local/PartitionTap.html to do this So my questions are: is there a reason why Spark doesn't provide this? Does Spark provide similar functionality through some other mechanism? How would it be best implemented? Since I started composing this message I've had a go at writing an wrapper OutputFormat that writes multiple outputs using hadoop MultipleOutputs but doesn't require modification of the PairRDDFunctions. The principle is similar however. Again it feels slightly hacky to use dummy fields for the ReduceContextImpl, but some of this may be a part of the impedance mismatch between Spark and plain Hadoop... Here is my attempt: https://gist.github.com/silasdavis/d1d1f1f7ab78249af462 I'd like to see this functionality in Spark somehow but invite suggestion of how best to achieve it. Thanks, Silas
Re: Writing to multiple outputs in Spark
This is already supported with the new partitioned data sources in DataFrame/SQL right? On Fri, Aug 14, 2015 at 8:04 AM, Alex Angelini alex.angel...@shopify.com wrote: Speaking about Shopify's deployment, this would be a really nice to have feature. We would like to write data to folders with the structure `year/month/day` but have had to hold off on that because of the lack of support for MultipleOutputs. On Fri, Aug 14, 2015 at 10:56 AM, Silas Davis si...@silasdavis.net wrote: Would it be right to assume that the silence on this topic implies others don't really have this issue/desire? On Sat, 18 Jul 2015 at 17:24 Silas Davis si...@silasdavis.net wrote: *tl;dr hadoop and cascading* *provide ways of writing tuples to multiple output files based on key, but the plain RDD interface doesn't seem to and it should.* I have been looking into ways to write to multiple outputs in Spark. It seems like a feature that is somewhat missing from Spark. The idea is to partition output and write the elements of an RDD to different locations depending based on the key. For example in a pair RDD your key may be (language, date, userId) and you would like to write separate files to $someBasePath/$language/$date. Then there would be a version of saveAsHadoopDataset that would be able to multiple location based on key using the underlying OutputFormat. Perahps it would take a pair RDD with keys ($partitionKey, $realKey), so for example ((language, date), userId). The prior art I have found on this is the following. Using SparkSQL: The 'partitionBy' method of DataFrameWriter: https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.DataFrameWriter This only works for parquet at the moment. Using Spark/Hadoop: This pull request (with the hadoop1 API,) : https://github.com/apache/spark/pull/4895/files. This uses MultipleTextOutputFormat (which in turn uses MultipleOutputFormat) which is part of the old hadoop1 API. It only works for text but could be generalised for any underlying OutputFormat by using MultipleOutputFormat (but only for hadoop1 - which doesn't support ParquetAvroOutputFormat for example) This gist (With the hadoop2 API): https://gist.github.com/mlehman/df9546f6be2e362bbad2 This uses MultipleOutputs (available for both the old and new hadoop APIs) and extends saveAsNewHadoopDataset to support multiple outputs. Should work for any underlying OutputFormat. Probably better implemented by extending saveAs[NewAPI]HadoopDataset. In Cascading: Cascading provides PartititionTap: http://docs.cascading.org/cascading/2.5/javadoc/cascading/tap/local/PartitionTap.html to do this So my questions are: is there a reason why Spark doesn't provide this? Does Spark provide similar functionality through some other mechanism? How would it be best implemented? Since I started composing this message I've had a go at writing an wrapper OutputFormat that writes multiple outputs using hadoop MultipleOutputs but doesn't require modification of the PairRDDFunctions. The principle is similar however. Again it feels slightly hacky to use dummy fields for the ReduceContextImpl, but some of this may be a part of the impedance mismatch between Spark and plain Hadoop... Here is my attempt: https://gist.github.com/silasdavis/d1d1f1f7ab78249af462 I'd like to see this functionality in Spark somehow but invite suggestion of how best to achieve it. Thanks, Silas
Re: Writing to multiple outputs in Spark
See: https://issues.apache.org/jira/browse/SPARK-3533 Feel free to comment there and make a case if you think the issue should be reopened. Nick On Fri, Aug 14, 2015 at 11:11 AM Abhishek R. Singh abhis...@tetrationanalytics.com wrote: A workaround would be to have multiple passes on the RDD and each pass write its own output? Or in a foreachPartition do it in a single pass (open up multiple files per partition to write out)? -Abhishek- On Aug 14, 2015, at 7:56 AM, Silas Davis si...@silasdavis.net wrote: Would it be right to assume that the silence on this topic implies others don't really have this issue/desire? On Sat, 18 Jul 2015 at 17:24 Silas Davis si...@silasdavis.net wrote: *tl;dr hadoop and cascading* *provide ways of writing tuples to multiple output files based on key, but the plain RDD interface doesn't seem to and it should.* I have been looking into ways to write to multiple outputs in Spark. It seems like a feature that is somewhat missing from Spark. The idea is to partition output and write the elements of an RDD to different locations depending based on the key. For example in a pair RDD your key may be (language, date, userId) and you would like to write separate files to $someBasePath/$language/$date. Then there would be a version of saveAsHadoopDataset that would be able to multiple location based on key using the underlying OutputFormat. Perahps it would take a pair RDD with keys ($partitionKey, $realKey), so for example ((language, date), userId). The prior art I have found on this is the following. Using SparkSQL: The 'partitionBy' method of DataFrameWriter: https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.DataFrameWriter This only works for parquet at the moment. Using Spark/Hadoop: This pull request (with the hadoop1 API,) : https://github.com/apache/spark/pull/4895/files. This uses MultipleTextOutputFormat (which in turn uses MultipleOutputFormat) which is part of the old hadoop1 API. It only works for text but could be generalised for any underlying OutputFormat by using MultipleOutputFormat (but only for hadoop1 - which doesn't support ParquetAvroOutputFormat for example) This gist (With the hadoop2 API): https://gist.github.com/mlehman/df9546f6be2e362bbad2 This uses MultipleOutputs (available for both the old and new hadoop APIs) and extends saveAsNewHadoopDataset to support multiple outputs. Should work for any underlying OutputFormat. Probably better implemented by extending saveAs[NewAPI]HadoopDataset. In Cascading: Cascading provides PartititionTap: http://docs.cascading.org/cascading/2.5/javadoc/cascading/tap/local/PartitionTap.html to do this So my questions are: is there a reason why Spark doesn't provide this? Does Spark provide similar functionality through some other mechanism? How would it be best implemented? Since I started composing this message I've had a go at writing an wrapper OutputFormat that writes multiple outputs using hadoop MultipleOutputs but doesn't require modification of the PairRDDFunctions. The principle is similar however. Again it feels slightly hacky to use dummy fields for the ReduceContextImpl, but some of this may be a part of the impedance mismatch between Spark and plain Hadoop... Here is my attempt: https://gist.github.com/silasdavis/d1d1f1f7ab78249af462 I'd like to see this functionality in Spark somehow but invite suggestion of how best to achieve it. Thanks, Silas