[jira] [Commented] (SPARK-3533) Add saveAsTextFileByKey() method to RDDs
[ https://issues.apache.org/jira/browse/SPARK-3533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15214577#comment-15214577 ] Nicholas Chammas commented on SPARK-3533: - I've added 2 workaround to this issue to the description body. > Add saveAsTextFileByKey() method to RDDs > > > Key: SPARK-3533 > URL: https://issues.apache.org/jira/browse/SPARK-3533 > Project: Spark > Issue Type: Improvement > Components: PySpark, Spark Core >Affects Versions: 1.1.0 >Reporter: Nicholas Chammas > > Users often have a single RDD of key-value pairs that they want to save to > multiple locations based on the keys. > For example, say I have an RDD like this: > {code} > >>> a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', > >>> 'Frankie']).keyBy(lambda x: x[0]) > >>> a.collect() > [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')] > >>> a.keys().distinct().collect() > ['B', 'F', 'N'] > {code} > Now I want to write the RDD out to different paths depending on the keys, so > that I have one output directory per distinct key. Each output directory > could potentially have multiple {{part-}} files, one per RDD partition. > So the output would look something like: > {code} > /path/prefix/B [/part-1, /part-2, etc] > /path/prefix/F [/part-1, /part-2, etc] > /path/prefix/N [/part-1, /part-2, etc] > {code} > Though it may be possible to do this with some combination of > {{saveAsNewAPIHadoopFile()}}, {{saveAsHadoopFile()}}, and the > {{MultipleTextOutputFormat}} output format class, it isn't straightforward. > It's not clear if it's even possible at all in PySpark. > Please add a {{saveAsTextFileByKey()}} method or something similar to RDDs > that makes it easy to save RDDs out to multiple locations at once. > --- > Update: March 2016 > There are two workarounds to this problem: > 1. See [this answer on Stack > Overflow|http://stackoverflow.com/a/26051042/877069], which implements > {{MultipleTextOutputFormat}}. (Scala-only) > 2. See [this comment by Davies > Liu|https://github.com/apache/spark/pull/8375#issuecomment-202458325], which > uses DataFrames: > {code} > val df = rdd.map(t => Row(gen_key(t), t)).toDF("key", "text") > df.write.partitionBy("key").text(path){code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3533) Add saveAsTextFileByKey() method to RDDs
[ https://issues.apache.org/jira/browse/SPARK-3533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14952860#comment-14952860 ] Alexander Betaev commented on SPARK-3533: - I may suggest [this|http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job] solution as workaround. > Add saveAsTextFileByKey() method to RDDs > > > Key: SPARK-3533 > URL: https://issues.apache.org/jira/browse/SPARK-3533 > Project: Spark > Issue Type: Improvement > Components: PySpark, Spark Core >Affects Versions: 1.1.0 >Reporter: Nicholas Chammas > > Users often have a single RDD of key-value pairs that they want to save to > multiple locations based on the keys. > For example, say I have an RDD like this: > {code} > >>> a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', > >>> 'Frankie']).keyBy(lambda x: x[0]) > >>> a.collect() > [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')] > >>> a.keys().distinct().collect() > ['B', 'F', 'N'] > {code} > Now I want to write the RDD out to different paths depending on the keys, so > that I have one output directory per distinct key. Each output directory > could potentially have multiple {{part-}} files, one per RDD partition. > So the output would look something like: > {code} > /path/prefix/B [/part-1, /part-2, etc] > /path/prefix/F [/part-1, /part-2, etc] > /path/prefix/N [/part-1, /part-2, etc] > {code} > Though it may be possible to do this with some combination of > {{saveAsNewAPIHadoopFile()}}, {{saveAsHadoopFile()}}, and the > {{MultipleTextOutputFormat}} output format class, it isn't straightforward. > It's not clear if it's even possible at all in PySpark. > Please add a {{saveAsTextFileByKey()}} method or something similar to RDDs > that makes it easy to save RDDs out to multiple locations at once. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3533) Add saveAsTextFileByKey() method to RDDs
[ https://issues.apache.org/jira/browse/SPARK-3533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14711291#comment-14711291 ] Jason Hubbard commented on SPARK-3533: -- Spark SQL has the ability to write to multiple file locations already SPARK-3007. I'm not recommending converting your RDD to DataFrame just to write to multiple locations, but it might be beneficial for them to share the same mechanism. One current limitation of the Spark SQL implementation is that each split will open a new Writer for each hive partition, and if there are a lot of hive partitions spread across the splits then it will cause many small files and possibly degrade performance because of memory usage. > Add saveAsTextFileByKey() method to RDDs > > > Key: SPARK-3533 > URL: https://issues.apache.org/jira/browse/SPARK-3533 > Project: Spark > Issue Type: Improvement > Components: PySpark, Spark Core >Affects Versions: 1.1.0 >Reporter: Nicholas Chammas > > Users often have a single RDD of key-value pairs that they want to save to > multiple locations based on the keys. > For example, say I have an RDD like this: > {code} > >>> a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', > >>> 'Frankie']).keyBy(lambda x: x[0]) > >>> a.collect() > [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')] > >>> a.keys().distinct().collect() > ['B', 'F', 'N'] > {code} > Now I want to write the RDD out to different paths depending on the keys, so > that I have one output directory per distinct key. Each output directory > could potentially have multiple {{part-}} files, one per RDD partition. > So the output would look something like: > {code} > /path/prefix/B [/part-1, /part-2, etc] > /path/prefix/F [/part-1, /part-2, etc] > /path/prefix/N [/part-1, /part-2, etc] > {code} > Though it may be possible to do this with some combination of > {{saveAsNewAPIHadoopFile()}}, {{saveAsHadoopFile()}}, and the > {{MultipleTextOutputFormat}} output format class, it isn't straightforward. > It's not clear if it's even possible at all in PySpark. > Please add a {{saveAsTextFileByKey()}} method or something similar to RDDs > that makes it easy to save RDDs out to multiple locations at once. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3533) Add saveAsTextFileByKey() method to RDDs
[ https://issues.apache.org/jira/browse/SPARK-3533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14708209#comment-14708209 ] Apache Spark commented on SPARK-3533: - User 'saurfang' has created a pull request for this issue: https://github.com/apache/spark/pull/8375 > Add saveAsTextFileByKey() method to RDDs > > > Key: SPARK-3533 > URL: https://issues.apache.org/jira/browse/SPARK-3533 > Project: Spark > Issue Type: Improvement > Components: PySpark, Spark Core >Affects Versions: 1.1.0 >Reporter: Nicholas Chammas > > Users often have a single RDD of key-value pairs that they want to save to > multiple locations based on the keys. > For example, say I have an RDD like this: > {code} > >>> a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', > >>> 'Frankie']).keyBy(lambda x: x[0]) > >>> a.collect() > [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')] > >>> a.keys().distinct().collect() > ['B', 'F', 'N'] > {code} > Now I want to write the RDD out to different paths depending on the keys, so > that I have one output directory per distinct key. Each output directory > could potentially have multiple {{part-}} files, one per RDD partition. > So the output would look something like: > {code} > /path/prefix/B [/part-1, /part-2, etc] > /path/prefix/F [/part-1, /part-2, etc] > /path/prefix/N [/part-1, /part-2, etc] > {code} > Though it may be possible to do this with some combination of > {{saveAsNewAPIHadoopFile()}}, {{saveAsHadoopFile()}}, and the > {{MultipleTextOutputFormat}} output format class, it isn't straightforward. > It's not clear if it's even possible at all in PySpark. > Please add a {{saveAsTextFileByKey()}} method or something similar to RDDs > that makes it easy to save RDDs out to multiple locations at once. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3533) Add saveAsTextFileByKey() method to RDDs
[ https://issues.apache.org/jira/browse/SPARK-3533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14705446#comment-14705446 ] Nicholas Chammas commented on SPARK-3533: - {quote} Nicholas Chammas Have you been able to take a look at the code? {quote} I'm unfortunately not in a good position to review the code and give the appropriate feedback. Someone like [~davies] or [~srowen] may be able to do that, but I can't speak for their availability. {quote} I'm not sure if you're suggesting it would be better to make a pull request now, or whether the gist is sufficient. I will open a pull request if you prefer. Is there anything else I should be doing to get committer buy-in? {quote} As a fellow contributor, I'm just advising that committer buy-in is essential to getting a feature like this landed. To get that, you may need to risk offering up a full solution knowing that it may be rejected or require many changes before acceptance. An alternative would be to get some pre-approval for the idea and guidance from a committer (perhaps via the dev list) before crafting a full solution. However, if the feature is not already a priority for some committer, this is unlikely to happen. I'm not sure what the right way to go is, but those are your options, realistically. > Add saveAsTextFileByKey() method to RDDs > > > Key: SPARK-3533 > URL: https://issues.apache.org/jira/browse/SPARK-3533 > Project: Spark > Issue Type: Improvement > Components: PySpark, Spark Core >Affects Versions: 1.1.0 >Reporter: Nicholas Chammas > > Users often have a single RDD of key-value pairs that they want to save to > multiple locations based on the keys. > For example, say I have an RDD like this: > {code} > >>> a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', > >>> 'Frankie']).keyBy(lambda x: x[0]) > >>> a.collect() > [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')] > >>> a.keys().distinct().collect() > ['B', 'F', 'N'] > {code} > Now I want to write the RDD out to different paths depending on the keys, so > that I have one output directory per distinct key. Each output directory > could potentially have multiple {{part-}} files, one per RDD partition. > So the output would look something like: > {code} > /path/prefix/B [/part-1, /part-2, etc] > /path/prefix/F [/part-1, /part-2, etc] > /path/prefix/N [/part-1, /part-2, etc] > {code} > Though it may be possible to do this with some combination of > {{saveAsNewAPIHadoopFile()}}, {{saveAsHadoopFile()}}, and the > {{MultipleTextOutputFormat}} output format class, it isn't straightforward. > It's not clear if it's even possible at all in PySpark. > Please add a {{saveAsTextFileByKey()}} method or something similar to RDDs > that makes it easy to save RDDs out to multiple locations at once. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3533) Add saveAsTextFileByKey() method to RDDs
[ https://issues.apache.org/jira/browse/SPARK-3533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14705278#comment-14705278 ] Silas Davis commented on SPARK-3533: I've looked at various solutions, and have summarised what I found in my post here: http://apache-spark-developers-list.1001551.n3.nabble.com/Writing-to-multiple-outputs-in-Spark-td13298.html. The Stack Overflow question linked only address multiple Text outputs, and only does that for hadoop 1. My code synthesises the idea of using a wrapping OutputFormat, and of another gist that uses MultipleOuputs, but modifies saveAsNewAPIHadoopFile. My code also makes do with the current Spark API, but was enough effort, and seems common enough an aim that I'd argue some of it should be moved into Spark itself. As for showing some code, my implementation is contained on the gist I have posted, and I have added this to the links attached to this ticket. I was hoping to get some comments on the code before embarking on a full pull request in which would require more consideration on where to place files etc. I'm not sure if you're suggesting it would be better to make a pull request now, or whether the gist is sufficient. I will open a pull request if you prefer. Is there anything else I should be doing to get committer buy-in? [~nchammas] Have you been able to take a look at the code? > Add saveAsTextFileByKey() method to RDDs > > > Key: SPARK-3533 > URL: https://issues.apache.org/jira/browse/SPARK-3533 > Project: Spark > Issue Type: Improvement > Components: PySpark, Spark Core >Affects Versions: 1.1.0 >Reporter: Nicholas Chammas > > Users often have a single RDD of key-value pairs that they want to save to > multiple locations based on the keys. > For example, say I have an RDD like this: > {code} > >>> a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', > >>> 'Frankie']).keyBy(lambda x: x[0]) > >>> a.collect() > [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')] > >>> a.keys().distinct().collect() > ['B', 'F', 'N'] > {code} > Now I want to write the RDD out to different paths depending on the keys, so > that I have one output directory per distinct key. Each output directory > could potentially have multiple {{part-}} files, one per RDD partition. > So the output would look something like: > {code} > /path/prefix/B [/part-1, /part-2, etc] > /path/prefix/F [/part-1, /part-2, etc] > /path/prefix/N [/part-1, /part-2, etc] > {code} > Though it may be possible to do this with some combination of > {{saveAsNewAPIHadoopFile()}}, {{saveAsHadoopFile()}}, and the > {{MultipleTextOutputFormat}} output format class, it isn't straightforward. > It's not clear if it's even possible at all in PySpark. > Please add a {{saveAsTextFileByKey()}} method or something similar to RDDs > that makes it easy to save RDDs out to multiple locations at once. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3533) Add saveAsTextFileByKey() method to RDDs
[ https://issues.apache.org/jira/browse/SPARK-3533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14705182#comment-14705182 ] Nicholas Chammas commented on SPARK-3533: - No need to open a separate ticket if your proposal is closely related to and satisfies the original intent of this one. More important is getting committer buy-in for your idea and showing some code. (Doing the latter first may help immensely with the former, in fact, but there's a risk the effort will still be rejected.) There are already other solutions out there (several on Stack Overflow which are linked to from here) that make do with Spark's current API. This proposal should focus on figuring what parts of those solutions can and should go into Spark core. > Add saveAsTextFileByKey() method to RDDs > > > Key: SPARK-3533 > URL: https://issues.apache.org/jira/browse/SPARK-3533 > Project: Spark > Issue Type: Improvement > Components: PySpark, Spark Core >Affects Versions: 1.1.0 >Reporter: Nicholas Chammas > > Users often have a single RDD of key-value pairs that they want to save to > multiple locations based on the keys. > For example, say I have an RDD like this: > {code} > >>> a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', > >>> 'Frankie']).keyBy(lambda x: x[0]) > >>> a.collect() > [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')] > >>> a.keys().distinct().collect() > ['B', 'F', 'N'] > {code} > Now I want to write the RDD out to different paths depending on the keys, so > that I have one output directory per distinct key. Each output directory > could potentially have multiple {{part-}} files, one per RDD partition. > So the output would look something like: > {code} > /path/prefix/B [/part-1, /part-2, etc] > /path/prefix/F [/part-1, /part-2, etc] > /path/prefix/N [/part-1, /part-2, etc] > {code} > Though it may be possible to do this with some combination of > {{saveAsNewAPIHadoopFile()}}, {{saveAsHadoopFile()}}, and the > {{MultipleTextOutputFormat}} output format class, it isn't straightforward. > It's not clear if it's even possible at all in PySpark. > Please add a {{saveAsTextFileByKey()}} method or something similar to RDDs > that makes it easy to save RDDs out to multiple locations at once. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3533) Add saveAsTextFileByKey() method to RDDs
[ https://issues.apache.org/jira/browse/SPARK-3533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14704939#comment-14704939 ] Silas Davis commented on SPARK-3533: Another possibility is that we don't use Hadoop's MultipleOuputs stuff at all. Cascading doesn't, for example. Whether or not to take that route depends on how much MultipleOuputs buys us (I haven't looked much into its implementation or what it does), and how much of a code smell setting up dummy ReduceContextImpl here: https://gist.github.com/silasdavis/d1d1f1f7ab78249af462#file-multipleoutputs-scala-L59 is. > Add saveAsTextFileByKey() method to RDDs > > > Key: SPARK-3533 > URL: https://issues.apache.org/jira/browse/SPARK-3533 > Project: Spark > Issue Type: Improvement > Components: PySpark, Spark Core >Affects Versions: 1.1.0 >Reporter: Nicholas Chammas > > Users often have a single RDD of key-value pairs that they want to save to > multiple locations based on the keys. > For example, say I have an RDD like this: > {code} > >>> a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', > >>> 'Frankie']).keyBy(lambda x: x[0]) > >>> a.collect() > [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')] > >>> a.keys().distinct().collect() > ['B', 'F', 'N'] > {code} > Now I want to write the RDD out to different paths depending on the keys, so > that I have one output directory per distinct key. Each output directory > could potentially have multiple {{part-}} files, one per RDD partition. > So the output would look something like: > {code} > /path/prefix/B [/part-1, /part-2, etc] > /path/prefix/F [/part-1, /part-2, etc] > /path/prefix/N [/part-1, /part-2, etc] > {code} > Though it may be possible to do this with some combination of > {{saveAsNewAPIHadoopFile()}}, {{saveAsHadoopFile()}}, and the > {{MultipleTextOutputFormat}} output format class, it isn't straightforward. > It's not clear if it's even possible at all in PySpark. > Please add a {{saveAsTextFileByKey()}} method or something similar to RDDs > that makes it easy to save RDDs out to multiple locations at once. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3533) Add saveAsTextFileByKey() method to RDDs
[ https://issues.apache.org/jira/browse/SPARK-3533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14704923#comment-14704923 ] Silas Davis commented on SPARK-3533: [~nchammas] I don't have implementations for Python or Java, other than that they could use the same OutputFormat to write multiple outputs using the current Spark API, however I'd be willing to try and put something together. At this stage though I think it might be a bit premature for PR as I what I wrote deliberately works without changing existing Spark code, but my feeling is that a more elegant solution could be reached by implanting some of the code I have in MultipleOutputsFormat into code in PairRDDFunctions. Has anyone had a chance to grok what I'm doing in the gist? Would it be a good idea to parameterise saveAsNewAPIHadoopDataset so that it can use MultipleOutputs? I might see if I can work out something sensible along these lines and we can compare the approaches. Another thing is that I've only just noticed that this ticket referes to saveAsTextFileByKey, in my gist you can see I have also have variants for saveAsMultipleAvroFiles and saveAsMultipleParquetFiles using the same approach, we don't have to include these specific helpers but I think we should generalise this for multiple outputs for any OutputFormat. Can we expand this ticket, or should I open a new one? [~saurfang] with reference to what I've said above I think it would be better to provide a solution for any type of multiple outputs, not just text, as they lend themselves to a unified approach and we might as well kill n birds with one stone. Also Hadoop 2 has no equivalent of MultipleTextOutputFormat, whereas Hadoop 1 does have a MultipleOutputs class which seems largely similar, so I think we can use the same approach for Hadoop 1 and 2 if we use an approach involving MultipleOuputs. So my personal opinion would be to not put together a PR based on MultipleTextOutputFormat. I would welcome your assistance on a more general PR/comments on the approach though. > Add saveAsTextFileByKey() method to RDDs > > > Key: SPARK-3533 > URL: https://issues.apache.org/jira/browse/SPARK-3533 > Project: Spark > Issue Type: Improvement > Components: PySpark, Spark Core >Affects Versions: 1.1.0 >Reporter: Nicholas Chammas > > Users often have a single RDD of key-value pairs that they want to save to > multiple locations based on the keys. > For example, say I have an RDD like this: > {code} > >>> a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', > >>> 'Frankie']).keyBy(lambda x: x[0]) > >>> a.collect() > [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')] > >>> a.keys().distinct().collect() > ['B', 'F', 'N'] > {code} > Now I want to write the RDD out to different paths depending on the keys, so > that I have one output directory per distinct key. Each output directory > could potentially have multiple {{part-}} files, one per RDD partition. > So the output would look something like: > {code} > /path/prefix/B [/part-1, /part-2, etc] > /path/prefix/F [/part-1, /part-2, etc] > /path/prefix/N [/part-1, /part-2, etc] > {code} > Though it may be possible to do this with some combination of > {{saveAsNewAPIHadoopFile()}}, {{saveAsHadoopFile()}}, and the > {{MultipleTextOutputFormat}} output format class, it isn't straightforward. > It's not clear if it's even possible at all in PySpark. > Please add a {{saveAsTextFileByKey()}} method or something similar to RDDs > that makes it easy to save RDDs out to multiple locations at once. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3533) Add saveAsTextFileByKey() method to RDDs
[ https://issues.apache.org/jira/browse/SPARK-3533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14703626#comment-14703626 ] Sen Fang commented on SPARK-3533: - I also have a working implementation based on Hadoop 1 + MultipleTextOutputFormat under Scala. [~silasdavis] let me know if you have time putting up a PR. If not, I can get one together this weekend. > Add saveAsTextFileByKey() method to RDDs > > > Key: SPARK-3533 > URL: https://issues.apache.org/jira/browse/SPARK-3533 > Project: Spark > Issue Type: Improvement > Components: PySpark, Spark Core >Affects Versions: 1.1.0 >Reporter: Nicholas Chammas > > Users often have a single RDD of key-value pairs that they want to save to > multiple locations based on the keys. > For example, say I have an RDD like this: > {code} > >>> a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', > >>> 'Frankie']).keyBy(lambda x: x[0]) > >>> a.collect() > [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')] > >>> a.keys().distinct().collect() > ['B', 'F', 'N'] > {code} > Now I want to write the RDD out to different paths depending on the keys, so > that I have one output directory per distinct key. Each output directory > could potentially have multiple {{part-}} files, one per RDD partition. > So the output would look something like: > {code} > /path/prefix/B [/part-1, /part-2, etc] > /path/prefix/F [/part-1, /part-2, etc] > /path/prefix/N [/part-1, /part-2, etc] > {code} > Though it may be possible to do this with some combination of > {{saveAsNewAPIHadoopFile()}}, {{saveAsHadoopFile()}}, and the > {{MultipleTextOutputFormat}} output format class, it isn't straightforward. > It's not clear if it's even possible at all in PySpark. > Please add a {{saveAsTextFileByKey()}} method or something similar to RDDs > that makes it easy to save RDDs out to multiple locations at once. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3533) Add saveAsTextFileByKey() method to RDDs
[ https://issues.apache.org/jira/browse/SPARK-3533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14699613#comment-14699613 ] Nicholas Chammas commented on SPARK-3533: - [~silasdavis] - If you already have a working implementation that covers at least the Python, Java, and Scala APIs, then I suggest opening a PR to get detailed feedback. Is there anyone watching this JIRA who would be willing to shepherd a PR to solve it? Apart from having a working PR, we will need people (especially committers) to review and critique the approach for it to be accepted. By the way Silas, there was a [previous attempt|https://github.com/apache/spark/pull/4895] at solving this issue that was closed by the author because he could not get it to work with Python. You might want to take a quick look at that. > Add saveAsTextFileByKey() method to RDDs > > > Key: SPARK-3533 > URL: https://issues.apache.org/jira/browse/SPARK-3533 > Project: Spark > Issue Type: Improvement > Components: PySpark, Spark Core >Affects Versions: 1.1.0 >Reporter: Nicholas Chammas > > Users often have a single RDD of key-value pairs that they want to save to > multiple locations based on the keys. > For example, say I have an RDD like this: > {code} > >>> a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', > >>> 'Frankie']).keyBy(lambda x: x[0]) > >>> a.collect() > [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')] > >>> a.keys().distinct().collect() > ['B', 'F', 'N'] > {code} > Now I want to write the RDD out to different paths depending on the keys, so > that I have one output directory per distinct key. Each output directory > could potentially have multiple {{part-}} files, one per RDD partition. > So the output would look something like: > {code} > /path/prefix/B [/part-1, /part-2, etc] > /path/prefix/F [/part-1, /part-2, etc] > /path/prefix/N [/part-1, /part-2, etc] > {code} > Though it may be possible to do this with some combination of > {{saveAsNewAPIHadoopFile()}}, {{saveAsHadoopFile()}}, and the > {{MultipleTextOutputFormat}} output format class, it isn't straightforward. > It's not clear if it's even possible at all in PySpark. > Please add a {{saveAsTextFileByKey()}} method or something similar to RDDs > that makes it easy to save RDDs out to multiple locations at once. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3533) Add saveAsTextFileByKey() method to RDDs
[ https://issues.apache.org/jira/browse/SPARK-3533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14699504#comment-14699504 ] Silas Davis commented on SPARK-3533: I'd like to suggest this be re-opened, writing partitions of a dataset to separate files based on keys is a common use case that is provided by Hadoop and Cascading for example. DataFrameWriter has partitionBy, but is only supported for parquet, and does not support cases where you wish to work with plain RDDs (for example using specific avro classes, or when you want to transform using map, mapPartitions, or combineByKey, which takes you out of DataFrame land). I have a working implementation based on the Hadoop 2+ MultipleOutputs class. The basic idea is to wrap an underlying OutputFormat within a OutputFormat class that derives from a MultipleOuputsFormat class that maintains an instance of MultipleOuputs for writing out based on key. Here is the gist: https://gist.github.com/silasdavis/d1d1f1f7ab78249af462 I've included tests and helper functions for completeness, but the meat of the implementation is the first 100 lines. You can also see how it's meant to be used by look at the saveAsMultipleAvroFiles code: https://gist.github.com/silasdavis/d1d1f1f7ab78249af462#file-multipleoutputs-scala-L287 It would be useful to get some comments on the general idea. I've tried to use as much of the Hadoop machinery as possible, similar to how PairRDDFunctions does. This means no existing spark code has needed to be changed, but a similar approach could be taken to incorporate MultipleOuptuts within the saveAsNewAPIHadoopDataset method. > Add saveAsTextFileByKey() method to RDDs > > > Key: SPARK-3533 > URL: https://issues.apache.org/jira/browse/SPARK-3533 > Project: Spark > Issue Type: Improvement > Components: PySpark, Spark Core >Affects Versions: 1.1.0 >Reporter: Nicholas Chammas > > Users often have a single RDD of key-value pairs that they want to save to > multiple locations based on the keys. > For example, say I have an RDD like this: > {code} > >>> a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', > >>> 'Frankie']).keyBy(lambda x: x[0]) > >>> a.collect() > [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')] > >>> a.keys().distinct().collect() > ['B', 'F', 'N'] > {code} > Now I want to write the RDD out to different paths depending on the keys, so > that I have one output directory per distinct key. Each output directory > could potentially have multiple {{part-}} files, one per RDD partition. > So the output would look something like: > {code} > /path/prefix/B [/part-1, /part-2, etc] > /path/prefix/F [/part-1, /part-2, etc] > /path/prefix/N [/part-1, /part-2, etc] > {code} > Though it may be possible to do this with some combination of > {{saveAsNewAPIHadoopFile()}}, {{saveAsHadoopFile()}}, and the > {{MultipleTextOutputFormat}} output format class, it isn't straightforward. > It's not clear if it's even possible at all in PySpark. > Please add a {{saveAsTextFileByKey()}} method or something similar to RDDs > that makes it easy to save RDDs out to multiple locations at once. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3533) Add saveAsTextFileByKey() method to RDDs
[ https://issues.apache.org/jira/browse/SPARK-3533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14347678#comment-14347678 ] Apache Spark commented on SPARK-3533: - User 'ilganeli' has created a pull request for this issue: https://github.com/apache/spark/pull/4895 > Add saveAsTextFileByKey() method to RDDs > > > Key: SPARK-3533 > URL: https://issues.apache.org/jira/browse/SPARK-3533 > Project: Spark > Issue Type: Improvement > Components: PySpark, Spark Core >Affects Versions: 1.1.0 >Reporter: Nicholas Chammas > > Users often have a single RDD of key-value pairs that they want to save to > multiple locations based on the keys. > For example, say I have an RDD like this: > {code} > >>> a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', > >>> 'Frankie']).keyBy(lambda x: x[0]) > >>> a.collect() > [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')] > >>> a.keys().distinct().collect() > ['B', 'F', 'N'] > {code} > Now I want to write the RDD out to different paths depending on the keys, so > that I have one output directory per distinct key. Each output directory > could potentially have multiple {{part-}} files, one per RDD partition. > So the output would look something like: > {code} > /path/prefix/B [/part-1, /part-2, etc] > /path/prefix/F [/part-1, /part-2, etc] > /path/prefix/N [/part-1, /part-2, etc] > {code} > Though it may be possible to do this with some combination of > {{saveAsNewAPIHadoopFile()}}, {{saveAsHadoopFile()}}, and the > {{MultipleTextOutputFormat}} output format class, it isn't straightforward. > It's not clear if it's even possible at all in PySpark. > Please add a {{saveAsTextFileByKey()}} method or something similar to RDDs > that makes it easy to save RDDs out to multiple locations at once. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3533) Add saveAsTextFileByKey() method to RDDs
[ https://issues.apache.org/jira/browse/SPARK-3533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14347216#comment-14347216 ] Ilya Ganelin commented on SPARK-3533: - [~aaronjosephs] - Let me see if that's it. Thanks! > Add saveAsTextFileByKey() method to RDDs > > > Key: SPARK-3533 > URL: https://issues.apache.org/jira/browse/SPARK-3533 > Project: Spark > Issue Type: Improvement > Components: PySpark, Spark Core >Affects Versions: 1.1.0 >Reporter: Nicholas Chammas > > Users often have a single RDD of key-value pairs that they want to save to > multiple locations based on the keys. > For example, say I have an RDD like this: > {code} > >>> a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', > >>> 'Frankie']).keyBy(lambda x: x[0]) > >>> a.collect() > [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')] > >>> a.keys().distinct().collect() > ['B', 'F', 'N'] > {code} > Now I want to write the RDD out to different paths depending on the keys, so > that I have one output directory per distinct key. Each output directory > could potentially have multiple {{part-}} files, one per RDD partition. > So the output would look something like: > {code} > /path/prefix/B [/part-1, /part-2, etc] > /path/prefix/F [/part-1, /part-2, etc] > /path/prefix/N [/part-1, /part-2, etc] > {code} > Though it may be possible to do this with some combination of > {{saveAsNewAPIHadoopFile()}}, {{saveAsHadoopFile()}}, and the > {{MultipleTextOutputFormat}} output format class, it isn't straightforward. > It's not clear if it's even possible at all in PySpark. > Please add a {{saveAsTextFileByKey()}} method or something similar to RDDs > that makes it easy to save RDDs out to multiple locations at once. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3533) Add saveAsTextFileByKey() method to RDDs
[ https://issues.apache.org/jira/browse/SPARK-3533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14347155#comment-14347155 ] Aaron commented on SPARK-3533: -- [~ilganeli] FYI I'm pretty sure the `()` means missing no args constructor not `init()` method > Add saveAsTextFileByKey() method to RDDs > > > Key: SPARK-3533 > URL: https://issues.apache.org/jira/browse/SPARK-3533 > Project: Spark > Issue Type: Improvement > Components: PySpark, Spark Core >Affects Versions: 1.1.0 >Reporter: Nicholas Chammas > > Users often have a single RDD of key-value pairs that they want to save to > multiple locations based on the keys. > For example, say I have an RDD like this: > {code} > >>> a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', > >>> 'Frankie']).keyBy(lambda x: x[0]) > >>> a.collect() > [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')] > >>> a.keys().distinct().collect() > ['B', 'F', 'N'] > {code} > Now I want to write the RDD out to different paths depending on the keys, so > that I have one output directory per distinct key. Each output directory > could potentially have multiple {{part-}} files, one per RDD partition. > So the output would look something like: > {code} > /path/prefix/B [/part-1, /part-2, etc] > /path/prefix/F [/part-1, /part-2, etc] > /path/prefix/N [/part-1, /part-2, etc] > {code} > Though it may be possible to do this with some combination of > {{saveAsNewAPIHadoopFile()}}, {{saveAsHadoopFile()}}, and the > {{MultipleTextOutputFormat}} output format class, it isn't straightforward. > It's not clear if it's even possible at all in PySpark. > Please add a {{saveAsTextFileByKey()}} method or something similar to RDDs > that makes it easy to save RDDs out to multiple locations at once. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3533) Add saveAsTextFileByKey() method to RDDs
[ https://issues.apache.org/jira/browse/SPARK-3533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14343387#comment-14343387 ] Ilya Ganelin commented on SPARK-3533: - Hey [~aaronjosephs], please feel free. I'm out of ideas for this one. You can see my code changes at the github link and the issue I ran into here. Thanks. > Add saveAsTextFileByKey() method to RDDs > > > Key: SPARK-3533 > URL: https://issues.apache.org/jira/browse/SPARK-3533 > Project: Spark > Issue Type: Improvement > Components: PySpark, Spark Core >Affects Versions: 1.1.0 >Reporter: Nicholas Chammas > > Users often have a single RDD of key-value pairs that they want to save to > multiple locations based on the keys. > For example, say I have an RDD like this: > {code} > >>> a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', > >>> 'Frankie']).keyBy(lambda x: x[0]) > >>> a.collect() > [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')] > >>> a.keys().distinct().collect() > ['B', 'F', 'N'] > {code} > Now I want to write the RDD out to different paths depending on the keys, so > that I have one output directory per distinct key. Each output directory > could potentially have multiple {{part-}} files, one per RDD partition. > So the output would look something like: > {code} > /path/prefix/B [/part-1, /part-2, etc] > /path/prefix/F [/part-1, /part-2, etc] > /path/prefix/N [/part-1, /part-2, etc] > {code} > Though it may be possible to do this with some combination of > {{saveAsNewAPIHadoopFile()}}, {{saveAsHadoopFile()}}, and the > {{MultipleTextOutputFormat}} output format class, it isn't straightforward. > It's not clear if it's even possible at all in PySpark. > Please add a {{saveAsTextFileByKey()}} method or something similar to RDDs > that makes it easy to save RDDs out to multiple locations at once. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3533) Add saveAsTextFileByKey() method to RDDs
[ https://issues.apache.org/jira/browse/SPARK-3533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14343353#comment-14343353 ] Aaron commented on SPARK-3533: -- [~ilganeli] Hey are you still working on this this seems like a valuable contribution, I wouldn't mind picking it up if u fill me in on the issues > Add saveAsTextFileByKey() method to RDDs > > > Key: SPARK-3533 > URL: https://issues.apache.org/jira/browse/SPARK-3533 > Project: Spark > Issue Type: Improvement > Components: PySpark, Spark Core >Affects Versions: 1.1.0 >Reporter: Nicholas Chammas > > Users often have a single RDD of key-value pairs that they want to save to > multiple locations based on the keys. > For example, say I have an RDD like this: > {code} > >>> a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', > >>> 'Frankie']).keyBy(lambda x: x[0]) > >>> a.collect() > [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')] > >>> a.keys().distinct().collect() > ['B', 'F', 'N'] > {code} > Now I want to write the RDD out to different paths depending on the keys, so > that I have one output directory per distinct key. Each output directory > could potentially have multiple {{part-}} files, one per RDD partition. > So the output would look something like: > {code} > /path/prefix/B [/part-1, /part-2, etc] > /path/prefix/F [/part-1, /part-2, etc] > /path/prefix/N [/part-1, /part-2, etc] > {code} > Though it may be possible to do this with some combination of > {{saveAsNewAPIHadoopFile()}}, {{saveAsHadoopFile()}}, and the > {{MultipleTextOutputFormat}} output format class, it isn't straightforward. > It's not clear if it's even possible at all in PySpark. > Please add a {{saveAsTextFileByKey()}} method or something similar to RDDs > that makes it easy to save RDDs out to multiple locations at once. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3533) Add saveAsTextFileByKey() method to RDDs
[ https://issues.apache.org/jira/browse/SPARK-3533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14265122#comment-14265122 ] Ilya Ganelin commented on SPARK-3533: - Hi all - I have that solution (using MultipleTextOutputFormat) implemented but sadly it doesn't work out of the box. saveAsHadoopFileByKey should generate a text file per key *** FAILED *** java.lang.RuntimeException: java.lang.NoSuchMethodException: org.apache.spark.rdd.PairRDDFunctions$RDDMultipleTextOutputFormat.() at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:115) Adding an init method to the definition does not help either so I'm still digging into other options. My code is here: https://github.com/ilganeli/spark/tree/SPARK-3533 I'll keep looking for alternatives. > Add saveAsTextFileByKey() method to RDDs > > > Key: SPARK-3533 > URL: https://issues.apache.org/jira/browse/SPARK-3533 > Project: Spark > Issue Type: Improvement > Components: PySpark, Spark Core >Affects Versions: 1.1.0 >Reporter: Nicholas Chammas > > Users often have a single RDD of key-value pairs that they want to save to > multiple locations based on the keys. > For example, say I have an RDD like this: > {code} > >>> a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', > >>> 'Frankie']).keyBy(lambda x: x[0]) > >>> a.collect() > [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')] > >>> a.keys().distinct().collect() > ['B', 'F', 'N'] > {code} > Now I want to write the RDD out to different paths depending on the keys, so > that I have one output directory per distinct key. Each output directory > could potentially have multiple {{part-}} files, one per RDD partition. > So the output would look something like: > {code} > /path/prefix/B [/part-1, /part-2, etc] > /path/prefix/F [/part-1, /part-2, etc] > /path/prefix/N [/part-1, /part-2, etc] > {code} > Though it may be possible to do this with some combination of > {{saveAsNewAPIHadoopFile()}}, {{saveAsHadoopFile()}}, and the > {{MultipleTextOutputFormat}} output format class, it isn't straightforward. > It's not clear if it's even possible at all in PySpark. > Please add a {{saveAsTextFileByKey()}} method or something similar to RDDs > that makes it easy to save RDDs out to multiple locations at once. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3533) Add saveAsTextFileByKey() method to RDDs
[ https://issues.apache.org/jira/browse/SPARK-3533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14250227#comment-14250227 ] Nicholas Chammas commented on SPARK-3533: - [~alilee] Do you mean [this answer|http://stackoverflow.com/a/26051042/877069]? I haven't tested it either, but it looks promising. What's the intended use case for a feature like {{PairRDD.splitByKey()}} returning a list of RDDs? > Add saveAsTextFileByKey() method to RDDs > > > Key: SPARK-3533 > URL: https://issues.apache.org/jira/browse/SPARK-3533 > Project: Spark > Issue Type: Improvement > Components: PySpark, Spark Core >Affects Versions: 1.1.0 >Reporter: Nicholas Chammas > > Users often have a single RDD of key-value pairs that they want to save to > multiple locations based on the keys. > For example, say I have an RDD like this: > {code} > >>> a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', > >>> 'Frankie']).keyBy(lambda x: x[0]) > >>> a.collect() > [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')] > >>> a.keys().distinct().collect() > ['B', 'F', 'N'] > {code} > Now I want to write the RDD out to different paths depending on the keys, so > that I have one output directory per distinct key. Each output directory > could potentially have multiple {{part-}} files, one per RDD partition. > So the output would look something like: > {code} > /path/prefix/B [/part-1, /part-2, etc] > /path/prefix/F [/part-1, /part-2, etc] > /path/prefix/N [/part-1, /part-2, etc] > {code} > Though it may be possible to do this with some combination of > {{saveAsNewAPIHadoopFile()}}, {{saveAsHadoopFile()}}, and the > {{MultipleTextOutputFormat}} output format class, it isn't straightforward. > It's not clear if it's even possible at all in PySpark. > Please add a {{saveAsTextFileByKey()}} method or something similar to RDDs > that makes it easy to save RDDs out to multiple locations at once. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3533) Add saveAsTextFileByKey() method to RDDs
[ https://issues.apache.org/jira/browse/SPARK-3533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14249752#comment-14249752 ] Alister Lee commented on SPARK-3533: The stackoverflow question has a good answer (I haven't tested it) using MultipleTextOutputFormat and PairRDD.partitionBy. Would a more generally useful feature be a PairRDD.splitByKey which returns RDD[]? (along the lines of .randomSplit(double[] weights))? > Add saveAsTextFileByKey() method to RDDs > > > Key: SPARK-3533 > URL: https://issues.apache.org/jira/browse/SPARK-3533 > Project: Spark > Issue Type: Improvement > Components: PySpark, Spark Core >Affects Versions: 1.1.0 >Reporter: Nicholas Chammas > > Users often have a single RDD of key-value pairs that they want to save to > multiple locations based on the keys. > For example, say I have an RDD like this: > {code} > >>> a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', > >>> 'Frankie']).keyBy(lambda x: x[0]) > >>> a.collect() > [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')] > >>> a.keys().distinct().collect() > ['B', 'F', 'N'] > {code} > Now I want to write the RDD out to different paths depending on the keys, so > that I have one output directory per distinct key. Each output directory > could potentially have multiple {{part-}} files, one per RDD partition. > So the output would look something like: > {code} > /path/prefix/B [/part-1, /part-2, etc] > /path/prefix/F [/part-1, /part-2, etc] > /path/prefix/N [/part-1, /part-2, etc] > {code} > Though it may be possible to do this with some combination of > {{saveAsNewAPIHadoopFile()}}, {{saveAsHadoopFile()}}, and the > {{MultipleTextOutputFormat}} output format class, it isn't straightforward. > It's not clear if it's even possible at all in PySpark. > Please add a {{saveAsTextFileByKey()}} method or something similar to RDDs > that makes it easy to save RDDs out to multiple locations at once. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3533) Add saveAsTextFileByKey() method to RDDs
[ https://issues.apache.org/jira/browse/SPARK-3533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14242913#comment-14242913 ] Ilya Ganelin commented on SPARK-3533: - I am looking into a solution for this. > Add saveAsTextFileByKey() method to RDDs > > > Key: SPARK-3533 > URL: https://issues.apache.org/jira/browse/SPARK-3533 > Project: Spark > Issue Type: Improvement > Components: PySpark, Spark Core >Affects Versions: 1.1.0 >Reporter: Nicholas Chammas > > Users often have a single RDD of key-value pairs that they want to save to > multiple locations based on the keys. > For example, say I have an RDD like this: > {code} > >>> a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', > >>> 'Frankie']).keyBy(lambda x: x[0]) > >>> a.collect() > [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')] > >>> a.keys().distinct().collect() > ['B', 'F', 'N'] > {code} > Now I want to write the RDD out to different paths depending on the keys, so > that I have one output directory per distinct key. Each output directory > could potentially have multiple {{part-}} files, one per RDD partition. > So the output would look something like: > {code} > /path/prefix/B [/part-1, /part-2, etc] > /path/prefix/F [/part-1, /part-2, etc] > /path/prefix/N [/part-1, /part-2, etc] > {code} > Though it may be possible to do this with some combination of > {{saveAsNewAPIHadoopFile()}}, {{saveAsHadoopFile()}}, and the > {{MultipleTextOutputFormat}} output format class, it isn't straightforward. > It's not clear if it's even possible at all in PySpark. > Please add a {{saveAsTextFileByKey()}} method or something similar to RDDs > that makes it easy to save RDDs out to multiple locations at once. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3533) Add saveAsTextFileByKey() method to RDDs
[ https://issues.apache.org/jira/browse/SPARK-3533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14143308#comment-14143308 ] Nicholas Chammas commented on SPARK-3533: - [~pwendell] / [~davies] - Is there any chance we can slate this feature request for the 1.2.0 release? Or, as an alternative, document a way to do something equivalent with the existing API? > Add saveAsTextFileByKey() method to RDDs > > > Key: SPARK-3533 > URL: https://issues.apache.org/jira/browse/SPARK-3533 > Project: Spark > Issue Type: Improvement > Components: PySpark, Spark Core >Affects Versions: 1.1.0 >Reporter: Nicholas Chammas > > Users often have a single RDD of key-value pairs that they want to save to > multiple locations based on the keys. > For example, say I have an RDD like this: > {code} > >>> a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', > >>> 'Frankie']).keyBy(lambda x: x[0]) > >>> a.collect() > [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')] > >>> a.keys().distinct().collect() > ['B', 'F', 'N'] > {code} > Now I want to write the RDD out to different paths depending on the keys, so > that I have one output directory per distinct key. Each output directory > could potentially have multiple {{part-}} files, one per RDD partition. > So the output would look something like: > {code} > /path/prefix/B [/part-1, /part-2, etc] > /path/prefix/F [/part-1, /part-2, etc] > /path/prefix/N [/part-1, /part-2, etc] > {code} > Though it may be possible to do this with some combination of > {{saveAsNewAPIHadoopFile()}}, {{saveAsHadoopFile()}}, and the > {{MultipleTextOutputFormat}} output format class, it isn't straightforward. > It's not clear if it's even possible at all in PySpark. > Please add a {{saveAsTextFileByKey()}} method or something similar to RDDs > that makes it easy to save RDDs out to multiple locations at once. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3533) Add saveAsTextFileByKey() method to RDDs
[ https://issues.apache.org/jira/browse/SPARK-3533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14135597#comment-14135597 ] Nicholas Chammas commented on SPARK-3533: - [~kzhang] - I noticed you authored these two PySpark examples, which seem related to the use case described here in their use of format classes: [Avro Input Format|https://github.com/apache/spark/blob/cc14644460872efb344e8d895859d70213a40840/examples/src/main/python/avro_inputformat.py#L73] and [HBase Output Format|https://github.com/apache/spark/blob/cc14644460872efb344e8d895859d70213a40840/examples/src/main/python/hbase_outputformat.py#L60] Do you have any pointers on how to do something like {{saveAsTextFileByKey()}} in PySpark using the existing API? > Add saveAsTextFileByKey() method to RDDs > > > Key: SPARK-3533 > URL: https://issues.apache.org/jira/browse/SPARK-3533 > Project: Spark > Issue Type: Improvement > Components: PySpark, Spark Core >Affects Versions: 1.1.0 >Reporter: Nicholas Chammas > > Users often have a single RDD of key-value pairs that they want to save to > multiple locations based on the keys. > For example, say I have an RDD like this: > {code} > >>> a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', > >>> 'Frankie']).keyBy(lambda x: x[0]) > >>> a.collect() > [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')] > >>> a.keys().distinct().collect() > ['B', 'F', 'N'] > {code} > Now I want to write the RDD out to different paths depending on the keys, so > that I have one output directory per distinct key. Each output directory > could potentially have multiple {{part-}} files, one per RDD partition. > So the output would look something like: > {code} > /path/prefix/B [/part-1, /part-2, etc] > /path/prefix/F [/part-1, /part-2, etc] > /path/prefix/N [/part-1, /part-2, etc] > {code} > Though it may be possible to do this with some combination of > {{saveAsNewAPIHadoopFile()}}, {{saveAsHadoopFile()}}, and the > {{MultipleTextOutputFormat}} output format class, it isn't straightforward. > It's not clear if it's even possible at all in PySpark. > Please add a {{saveAsTextFileByKey()}} method or something similar to RDDs > that makes it easy to save RDDs out to multiple locations at once. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3533) Add saveAsTextFileByKey() method to RDDs
[ https://issues.apache.org/jira/browse/SPARK-3533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14134877#comment-14134877 ] Nicholas Chammas commented on SPARK-3533: - CC [~davies] and [~pwendell]. > Add saveAsTextFileByKey() method to RDDs > > > Key: SPARK-3533 > URL: https://issues.apache.org/jira/browse/SPARK-3533 > Project: Spark > Issue Type: Improvement > Components: PySpark, Spark Core >Affects Versions: 1.1.0 >Reporter: Nicholas Chammas > > Users often have a single RDD of key-value pairs that they want to save to > multiple locations based on the keys. > For example, say I have an RDD like this: > {code} > >>> a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', > >>> 'Frankie']).keyBy(lambda x: x[0]) > >>> a.collect() > [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')] > >>> a.keys().distinct().collect() > ['B', 'F', 'N'] > {code} > Now I want to write the RDD out to different paths depending on the keys, so > that I have one output directory per distinct key. Each output directory > could potentially have multiple {{part-}} files, one per RDD partition. > So the output would look something like: > {code} > /path/prefix/B [/part-1, /part-2, etc] > /path/prefix/F [/part-1, /part-2, etc] > /path/prefix/N [/part-1, /part-2, etc] > {code} > Though it may be possible to do this with some combination of > {{saveAsNewAPIHadoopFile()}}, {{saveAsHadoopFile()}}, and the > {{MultipleTextOutputFormat}} output format class, it isn't straightforward. > It's not clear if it's even possible at all in PySpark. > Please add a {{saveAsTextFileByKey()}} method or something similar to RDDs > that makes it easy to save RDDs out to multiple locations at once. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org