Re: Which is more efficient : first join three RDDs and then do filtering or vice versa?
Thanks, it makes sense. On Thursday, March 12, 2015, Daniel Siegmann wrote: > Join causes a shuffle (sending data across the network). I expect it will > be better to filter before you join, so you reduce the amount of data which > is sent across the network. > > Note this would be true for *any* transformation which causes a shuffle. > It would not be true if you're combining RDDs with union, since that > doesn't cause a shuffle. > > On Thu, Mar 12, 2015 at 11:04 AM, shahab > wrote: > >> Hi, >> >> Probably this question is already answered sometime in the mailing list, >> but i couldn't find it. Sorry for posting this again. >> >> I need to to join and apply filtering on three different RDDs, I just >> wonder which of the following alternatives are more efficient: >> 1- first joint all three RDDs and then do filtering on resulting joint >> RDD or >> 2- Apply filtering on each individual RDD and then join the resulting RDDs >> >> >> Or probably there is no difference due to lazy evaluation and under >> beneath Spark optimisation? >> >> best, >> /Shahab >> > >
Re: Which is more efficient : first join three RDDs and then do filtering or vice versa?
Join causes a shuffle (sending data across the network). I expect it will be better to filter before you join, so you reduce the amount of data which is sent across the network. Note this would be true for *any* transformation which causes a shuffle. It would not be true if you're combining RDDs with union, since that doesn't cause a shuffle. On Thu, Mar 12, 2015 at 11:04 AM, shahab wrote: > Hi, > > Probably this question is already answered sometime in the mailing list, > but i couldn't find it. Sorry for posting this again. > > I need to to join and apply filtering on three different RDDs, I just > wonder which of the following alternatives are more efficient: > 1- first joint all three RDDs and then do filtering on resulting joint > RDD or > 2- Apply filtering on each individual RDD and then join the resulting RDDs > > > Or probably there is no difference due to lazy evaluation and under > beneath Spark optimisation? > > best, > /Shahab >
Which is more efficient : first join three RDDs and then do filtering or vice versa?
Hi, Probably this question is already answered sometime in the mailing list, but i couldn't find it. Sorry for posting this again. I need to to join and apply filtering on three different RDDs, I just wonder which of the following alternatives are more efficient: 1- first joint all three RDDs and then do filtering on resulting joint RDD or 2- Apply filtering on each individual RDD and then join the resulting RDDs Or probably there is no difference due to lazy evaluation and under beneath Spark optimisation? best, /Shahab
Re: Is there a limit to the number of RDDs in a Spark context?
Hi, It's been some time since my last message on the subject of using many RDDs in a Spark job, but I have just encountered the same problem again. The thing it's that I have an RDD of time tagged data, that I want to 1) divide into windows according to a timestamp field; 2) compute KMeans for each time window; 3) collect the results in a RDD of pairs that contains the centroids for each time window. For 1) I generate a RDD of pairs where the key is an id for the time window, but for 2) I find the problem that KMeans from MLlib only accepts a RDD, and I cannot call it from aggregateByKey. I think this is a reusability problem for any algorithms in MLlib based on passing an RDD, if we want to apply the algorithm independently to several groups of data. So the only approaches I can imagine are: a) Generate an RDD per time window, which is easy to do but doesn't work because it's easy to end up with thousand of windows hence thousands of RDDs, which freezes the Spark scheduler, as seen in my previous messages b) Collect the set of ids for the time windows in the driver, and traverse that set by generating an RDD per each window, calling KMeans, and then storing the results with an export action. I will try that now and I think that could work because only one RDD per window will be present at the same time. The point here is that we avoid creating an RDD with a lineage dependending on a thousand RDDs, like in the collecting phase 3) of a). But that implies a sequential execution of the computation of KMeans, which is a waste of resources: imagine I have a cluster with 200 machines and I can compute each call to KMeans in 5 machines in 10 minutes, and I have 1000 windows to compute hence 1000 calls to KMeans; by sequencing the KMeans computations I would be having 195 idle machines and a running time of 10 * 1000 windows. Maybe this could be overcome by having not 1 RDD but m RDDs for some number m that doesn't freezes the Spark scheduler, but I think that's a not very clean workaround. Also, this makes very difficult to reuse this computation of KMeans by window in a bigger program, because I'm not able to get an RDD with a key per window id and the centroids in the values. The only way I imagine I could do that is by storing the pairs in a database during the export actions, and then loading all the results in a single RDD, but I would prefer to do everything inside Spark if possible. Maybe I'm missing something here, any idea would be appreciated. Thanks in advance for your help, Greetings, Juan Rodriguez 2015-02-18 20:23 GMT+01:00 Juan Rodríguez Hortalá < juan.rodriguez.hort...@gmail.com>: > Hi Sean, > > Thanks a lot for your answer. That explains it, as I was creating > thousands of RDDs, so I guess the communication overhead was the reason why > the Spark job was freezing. After changing the code to use RDDs of pairs > and aggregateByKey it works just fine, and quite fast. > > Again, thanks a lot for your help. > > Greetings, > > Juan > > 2015-02-18 15:35 GMT+01:00 Sean Owen : > >> At some level, enough RDDs creates hundreds of thousands of tiny >> partitions of data each of which creates a task for each stage. The >> raw overhead of all the message passing can slow things down a lot. I >> would not design something to use an RDD per key. You would generally >> use key by some value you want to divide and filter on, and then use a >> *ByKey to do your work. >> >> You don't work with individual RDDs this way, but usually that's good >> news. You usually have a lot more flexibility operating just in pure >> Java / Scala to do whatever you need inside your function. >> >> On Wed, Feb 18, 2015 at 2:12 PM, Juan Rodríguez Hortalá >> wrote: >> > Hi Paweł, >> > >> > Thanks a lot for your answer. I finally got the program to work by using >> > aggregateByKey, but I was wondering why creating thousands of RDDs >> doesn't >> > work. I think that could be interesting for using methods that work on >> RDDs >> > like for example JavaDoubleRDD.stats() ( >> > >> http://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaDoubleRDD.html#stats%28%29 >> ). >> > If the groups are small then I can chain groupBy(), collect(), >> parallelize() >> > and stats(), but that is quite inefficient because it implies moving >> data to >> > and from the driver, and also doesn't scale to big groups; on the other >> hand >> > if I use aggregateByKey or a similar function then I cannot use stats() >> so I >> > have to reimplement it. In general I was looking for a way to reuse >> other >> > functions that I have that work on RDDs, for using them on groups
Re: RDDs
The above is a great example using thread. Does any one have an example using scala/Akka Future to do the same. I am looking for an example like that which uses a Akka Future and does something if the Future "Timesout" On Tue, Mar 3, 2015 at 9:16 AM, Manas Kar wrote: > The above is a great example using thread. > Does any one have an example using scala/Akka Future to do the same. > I am looking for an example like that which uses a Akka Future and does > something if the Future "Timesout" > > On Tue, Mar 3, 2015 at 7:00 AM, Kartheek.R > wrote: > >> Hi TD, >> "You can always run two jobs on the same cached RDD, and they can run in >> parallel (assuming you launch the 2 jobs from two different threads)" >> >> Is this a correct way to launch jobs from two different threads? >> >> val threadA = new Thread(new Runnable { >> def run() { >> for(i<- 0 until end) >> { >> val numAs = logData.filter(line => line.contains("a")) >> println("Lines with a: %s".format(numAs.count)) >> } >> } >> }) >> >>val threadB = new Thread(new Runnable { >> def run() { >> for(i<- 0 until end) >> { >> val numBs = logData.filter(line => line.contains("b")) >> println("Lines with b: %s".format(numBs.count)) >> } >> } >> }) >> >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/RDDs-tp13343p21892.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >
Re: RDDs
The above is a great example using thread. Does any one have an example using scala/Akka Future to do the same. I am looking for an example like that which uses a Akka Future and does something if the Future "Timesout" On Tue, Mar 3, 2015 at 7:00 AM, Kartheek.R wrote: > Hi TD, > "You can always run two jobs on the same cached RDD, and they can run in > parallel (assuming you launch the 2 jobs from two different threads)" > > Is this a correct way to launch jobs from two different threads? > > val threadA = new Thread(new Runnable { > def run() { > for(i<- 0 until end) > { > val numAs = logData.filter(line => line.contains("a")) > println("Lines with a: %s".format(numAs.count)) > } > } > }) > >val threadB = new Thread(new Runnable { > def run() { > for(i<- 0 until end) > { > val numBs = logData.filter(line => line.contains("b")) > println("Lines with b: %s".format(numBs.count)) > } > } > }) > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/RDDs-tp13343p21892.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: RDDs
Hi TD, "You can always run two jobs on the same cached RDD, and they can run in parallel (assuming you launch the 2 jobs from two different threads)" Is this a correct way to launch jobs from two different threads? val threadA = new Thread(new Runnable { def run() { for(i<- 0 until end) { val numAs = logData.filter(line => line.contains("a")) println("Lines with a: %s".format(numAs.count)) } } }) val threadB = new Thread(new Runnable { def run() { for(i<- 0 until end) { val numBs = logData.filter(line => line.contains("b")) println("Lines with b: %s".format(numBs.count)) } } }) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDDs-tp13343p21892.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Columnar-Oriented RDDs
Hey, I do not have any statistics. I just wanted to show it can be done but left it at that. The memory usage should be predictable: the benefit comes from using arrays for primitive types. Accessing the data row-wise means re-assembling the rows from the columnar data, which i have not tried to profile or optimize at all yet, but for sure there should be some overhead compared to an row-oriented RDD. Also the format relies on compile-time types (which is what allows the usage of arrays for primitive types). On Sun, Mar 1, 2015 at 6:33 AM, Night Wolf wrote: > Thanks for the comments guys. > > Parquet is awesome. My question with using Parquet for on disk storage - > how should I load that into memory as a spark RDD and cache it and keep it > in a columnar format? > > I know I can use Spark SQL with parquet which is awesome. But as soon as I > step out of SQL we have problems as it kinda gets converted back to a row > oriented format. > > @Koert - that looks really exciting. Do you have any statistics on memory > and scan performance? > > > On Saturday, February 14, 2015, Koert Kuipers wrote: > >> i wrote a proof of concept to automatically store any RDD of tuples or >> case classes in columar format using arrays (and strongly typed, so you get >> the benefit of primitive arrays). see: >> https://github.com/tresata/spark-columnar >> >> On Fri, Feb 13, 2015 at 3:06 PM, Michael Armbrust > > wrote: >> >>> Shark's in-memory code was ported to Spark SQL and is used by default >>> when you run .cache on a SchemaRDD or CACHE TABLE. >>> >>> I'd also look at parquet which is more efficient and handles nested data >>> better. >>> >>> On Fri, Feb 13, 2015 at 7:36 AM, Night Wolf >>> wrote: >>> >>>> Hi all, >>>> >>>> I'd like to build/use column oriented RDDs in some of my Spark code. A >>>> normal Spark RDD is stored as row oriented object if I understand >>>> correctly. >>>> >>>> I'd like to leverage some of the advantages of a columnar memory >>>> format. Shark (used to) and SparkSQL uses a columnar storage format using >>>> primitive arrays for each column. >>>> >>>> I'd be interested to know more about this approach and how I could >>>> build my own custom columnar-oriented RDD which I can use outside of Spark >>>> SQL. >>>> >>>> Could anyone give me some pointers on where to look to do something >>>> like this, either from scratch or using whats there in the SparkSQL libs or >>>> elsewhere. I know Evan Chan in a presentation made mention of building a >>>> custom RDD of column-oriented blocks of data. >>>> >>>> Cheers, >>>> ~N >>>> >>> >>> >>
Re: Columnar-Oriented RDDs
Thanks for the comments guys. Parquet is awesome. My question with using Parquet for on disk storage - how should I load that into memory as a spark RDD and cache it and keep it in a columnar format? I know I can use Spark SQL with parquet which is awesome. But as soon as I step out of SQL we have problems as it kinda gets converted back to a row oriented format. @Koert - that looks really exciting. Do you have any statistics on memory and scan performance? On Saturday, February 14, 2015, Koert Kuipers wrote: > i wrote a proof of concept to automatically store any RDD of tuples or > case classes in columar format using arrays (and strongly typed, so you get > the benefit of primitive arrays). see: > https://github.com/tresata/spark-columnar > > On Fri, Feb 13, 2015 at 3:06 PM, Michael Armbrust > wrote: > >> Shark's in-memory code was ported to Spark SQL and is used by default >> when you run .cache on a SchemaRDD or CACHE TABLE. >> >> I'd also look at parquet which is more efficient and handles nested data >> better. >> >> On Fri, Feb 13, 2015 at 7:36 AM, Night Wolf > > wrote: >> >>> Hi all, >>> >>> I'd like to build/use column oriented RDDs in some of my Spark code. A >>> normal Spark RDD is stored as row oriented object if I understand >>> correctly. >>> >>> I'd like to leverage some of the advantages of a columnar memory format. >>> Shark (used to) and SparkSQL uses a columnar storage format using primitive >>> arrays for each column. >>> >>> I'd be interested to know more about this approach and how I could build >>> my own custom columnar-oriented RDD which I can use outside of Spark SQL. >>> >>> Could anyone give me some pointers on where to look to do something like >>> this, either from scratch or using whats there in the SparkSQL libs or >>> elsewhere. I know Evan Chan in a presentation made mention of building a >>> custom RDD of column-oriented blocks of data. >>> >>> Cheers, >>> ~N >>> >> >> >
Re: Iterating on RDDs
As you suggested, I tried to save the grouped RDD and persisted it in memory before the iterations begin. The performance seems to be much better now. My previous comment that the run times doubled was from a wrong observation. Thanks. On Fri, Feb 27, 2015 at 10:27 AM, Vijayasarathy Kannan wrote: > Thanks. > > I tried persist() on the RDD. The runtimes appear to have doubled now > (without persist() it was ~7s per iteration and now its ~15s). I am running > standalone Spark on a 8-core machine. > Any thoughts on why the increase in runtime? > > On Thu, Feb 26, 2015 at 4:27 PM, Imran Rashid > wrote: > >> >> val grouped = R.groupBy[VertexId](G).persist(StorageLeve.MEMORY_ONLY_SER) >> // or whatever persistence makes more sense for you ... >> while(true) { >> val res = grouped.flatMap(F) >> res.collect.foreach(func) >> if(criteria) >> break >> } >> >> On Thu, Feb 26, 2015 at 10:56 AM, Vijayasarathy Kannan >> wrote: >> >>> Hi, >>> >>> I have the following use case. >>> >>> (1) I have an RDD of edges of a graph (say R). >>> (2) do a groupBy on R (by say source vertex) and call a function F on >>> each group. >>> (3) collect the results from Fs and do some computation >>> (4) repeat the above steps until some criteria is met >>> >>> In (2), the groups are always going to be the same (since R is grouped >>> by source vertex). >>> >>> Question: >>> Is R distributed every iteration (when in (2)) or is it distributed only >>> once when it is created? >>> >>> A sample code snippet is below. >>> >>> while(true) { >>> val res = R.groupBy[VertexId](G).flatMap(F) >>> res.collect.foreach(func) >>> if(criteria) >>> break >>> } >>> >>> Since the groups remain the same, what is the best way to go about >>> implementing the above logic? >>> >> >> >
Re: Iterating on RDDs
Thanks. I tried persist() on the RDD. The runtimes appear to have doubled now (without persist() it was ~7s per iteration and now its ~15s). I am running standalone Spark on a 8-core machine. Any thoughts on why the increase in runtime? On Thu, Feb 26, 2015 at 4:27 PM, Imran Rashid wrote: > > val grouped = R.groupBy[VertexId](G).persist(StorageLeve.MEMORY_ONLY_SER) > // or whatever persistence makes more sense for you ... > while(true) { > val res = grouped.flatMap(F) > res.collect.foreach(func) > if(criteria) > break > } > > On Thu, Feb 26, 2015 at 10:56 AM, Vijayasarathy Kannan > wrote: > >> Hi, >> >> I have the following use case. >> >> (1) I have an RDD of edges of a graph (say R). >> (2) do a groupBy on R (by say source vertex) and call a function F on >> each group. >> (3) collect the results from Fs and do some computation >> (4) repeat the above steps until some criteria is met >> >> In (2), the groups are always going to be the same (since R is grouped by >> source vertex). >> >> Question: >> Is R distributed every iteration (when in (2)) or is it distributed only >> once when it is created? >> >> A sample code snippet is below. >> >> while(true) { >> val res = R.groupBy[VertexId](G).flatMap(F) >> res.collect.foreach(func) >> if(criteria) >> break >> } >> >> Since the groups remain the same, what is the best way to go about >> implementing the above logic? >> > >
Re: Iterating on RDDs
val grouped = R.groupBy[VertexId](G).persist(StorageLeve.MEMORY_ONLY_SER) // or whatever persistence makes more sense for you ... while(true) { val res = grouped.flatMap(F) res.collect.foreach(func) if(criteria) break } On Thu, Feb 26, 2015 at 10:56 AM, Vijayasarathy Kannan wrote: > Hi, > > I have the following use case. > > (1) I have an RDD of edges of a graph (say R). > (2) do a groupBy on R (by say source vertex) and call a function F on each > group. > (3) collect the results from Fs and do some computation > (4) repeat the above steps until some criteria is met > > In (2), the groups are always going to be the same (since R is grouped by > source vertex). > > Question: > Is R distributed every iteration (when in (2)) or is it distributed only > once when it is created? > > A sample code snippet is below. > > while(true) { > val res = R.groupBy[VertexId](G).flatMap(F) > res.collect.foreach(func) > if(criteria) > break > } > > Since the groups remain the same, what is the best way to go about > implementing the above logic? >
Iterating on RDDs
Hi, I have the following use case. (1) I have an RDD of edges of a graph (say R). (2) do a groupBy on R (by say source vertex) and call a function F on each group. (3) collect the results from Fs and do some computation (4) repeat the above steps until some criteria is met In (2), the groups are always going to be the same (since R is grouped by source vertex). Question: Is R distributed every iteration (when in (2)) or is it distributed only once when it is created? A sample code snippet is below. while(true) { val res = R.groupBy[VertexId](G).flatMap(F) res.collect.foreach(func) if(criteria) break } Since the groups remain the same, what is the best way to go about implementing the above logic?
Re: Spark Streaming - Collecting RDDs into array in the driver program
Hi, On Thu, Feb 26, 2015 at 11:24 AM, Thanigai Vellore < thanigai.vell...@gmail.com> wrote: > It appears that the function immediately returns even before the > foreachrdd stage is executed. Is that possible? > Sure, that's exactly what happens. foreachRDD() schedules a computation, it does not perform it. Maybe your streaming application would not ever terminate, but still the function needs to return, right? If you remove the toArray(), you will return a reference to the ArrayBuffer that will be appended to over time. You can then, in a different thread, check the contents of that ArrayBuffer as processing happens, or wait until processing ends. Tobias
Re: Spark Streaming - Collecting RDDs into array in the driver program
I didn't include the complete driver code but I do run the streaming context from the main program which calls this function. Again, I can print the red elements within the foreachrdd block but the array that is returned is always empty. It appears that the function immediately returns even before the foreachrdd stage is executed. Is that possible? On Feb 25, 2015 5:41 PM, "Tathagata Das" wrote: > You are just setting up the computation here using foreacRDD. You have not > even run the streaming context to get any data. > > > On Wed, Feb 25, 2015 at 2:21 PM, Thanigai Vellore < > thanigai.vell...@gmail.com> wrote: > >> I have this function in the driver program which collects the result from >> rdds (in a stream) into an array and return. However, even though the RDDs >> (in the dstream) have data, the function is returning an empty array...What >> am I doing wrong? >> >> I can print the RDD values inside the foreachRDD call but the array is >> always empty. >> >> def runTopFunction() : Array[(String, Int)] = { >> val topSearches = some function >> val summary = new ArrayBuffer[(String,Int)]() >> topSearches.foreachRDD(rdd => { >> summary = summary.++(rdd.collect()) >> }) >> >> return summary.toArray >> } >> >> >
Re: Spark Streaming - Collecting RDDs into array in the driver program
You are just setting up the computation here using foreacRDD. You have not even run the streaming context to get any data. On Wed, Feb 25, 2015 at 2:21 PM, Thanigai Vellore < thanigai.vell...@gmail.com> wrote: > I have this function in the driver program which collects the result from > rdds (in a stream) into an array and return. However, even though the RDDs > (in the dstream) have data, the function is returning an empty array...What > am I doing wrong? > > I can print the RDD values inside the foreachRDD call but the array is > always empty. > > def runTopFunction() : Array[(String, Int)] = { > val topSearches = some function > val summary = new ArrayBuffer[(String,Int)]() > topSearches.foreachRDD(rdd => { > summary = summary.++(rdd.collect()) > }) > > return summary.toArray > } > >
Spark Streaming - Collecting RDDs into array in the driver program
I have this function in the driver program which collects the result from rdds (in a stream) into an array and return. However, even though the RDDs (in the dstream) have data, the function is returning an empty array...What am I doing wrong? I can print the RDD values inside the foreachRDD call but the array is always empty. def runTopFunction() : Array[(String, Int)] = { val topSearches = some function val summary = new ArrayBuffer[(String,Int)]() topSearches.foreachRDD(rdd => { summary = summary.++(rdd.collect()) }) return summary.toArray }
Re: Executors dropping all memory stored RDDs?
I have a strong suspicion that it was caused by a disk full on the executor. I am not sure if the executor was supposed to recover that way from it. I cannot be sure about it, I should have had enough disk space, but I think I had some data skew which could have lead to some executor to run out of disk. So, in case someone else notices a behavior like this, make sure you check your cluster monitor (like ganglia). On Wed, Jan 28, 2015 at 5:40 PM, Thomas Gerber wrote: > Hello, > > I am storing RDDs with the MEMORY_ONLY_SER Storage Level, during the run > of a big job. > > At some point during the job, I went to the Executors page, and saw that > 80% of my executors did not have stored RDDs anymore (executors.png). On > the storage page, everything seems "there" (storage.png). > > But if I look at a given RDD (RDD_83.png), although it tells me on top > that all 100 partitions are cached, when I look at the details, only 17 are > actually stored (RDD_83_partitions), all on the 20% of executors that still > had stored RDDs based on the Executors page. > > So I wonder: > 1. Are those RDD still cached (in which case, we have a small reporting > error), or not? > 2. If not, what could cause an executor to drop its memory-stored RDD > blocks? > > I guess a restart of an executor? When I compare an executor that seems to > have dropped blocks vs one that has not: > *** their > *spark-hadoop-org.apache.spark.deploy.worker.Worker-1-ip-XX-XX-XX-XX.ec2.internal.out* > content look the same > *** they both have the same etime in ps (so, I guess no restart?) > *** didn't see anything in the app log in the work folder (but it is > large, so I might have missed it) > > Also, I must mention that the cluster was doing a lot of GCs, which might > be a cause of the trouble. > > I would appreciate any pointer. > Thomas > >
Re: Is there a limit to the number of RDDs in a Spark context?
Hi Sean, Thanks a lot for your answer. That explains it, as I was creating thousands of RDDs, so I guess the communication overhead was the reason why the Spark job was freezing. After changing the code to use RDDs of pairs and aggregateByKey it works just fine, and quite fast. Again, thanks a lot for your help. Greetings, Juan 2015-02-18 15:35 GMT+01:00 Sean Owen : > At some level, enough RDDs creates hundreds of thousands of tiny > partitions of data each of which creates a task for each stage. The > raw overhead of all the message passing can slow things down a lot. I > would not design something to use an RDD per key. You would generally > use key by some value you want to divide and filter on, and then use a > *ByKey to do your work. > > You don't work with individual RDDs this way, but usually that's good > news. You usually have a lot more flexibility operating just in pure > Java / Scala to do whatever you need inside your function. > > On Wed, Feb 18, 2015 at 2:12 PM, Juan Rodríguez Hortalá > wrote: > > Hi Paweł, > > > > Thanks a lot for your answer. I finally got the program to work by using > > aggregateByKey, but I was wondering why creating thousands of RDDs > doesn't > > work. I think that could be interesting for using methods that work on > RDDs > > like for example JavaDoubleRDD.stats() ( > > > http://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaDoubleRDD.html#stats%28%29 > ). > > If the groups are small then I can chain groupBy(), collect(), > parallelize() > > and stats(), but that is quite inefficient because it implies moving > data to > > and from the driver, and also doesn't scale to big groups; on the other > hand > > if I use aggregateByKey or a similar function then I cannot use stats() > so I > > have to reimplement it. In general I was looking for a way to reuse other > > functions that I have that work on RDDs, for using them on groups of > data in > > a RDD, because I don't see a how to directly apply them to each of the > > groups in a pair RDD. > > > > Again, thanks a lot for your answer, > > > > Greetings, > > > > Juan Rodriguez > > > > > > > > > > 2015-02-18 14:56 GMT+01:00 Paweł Szulc : > >> > >> Maybe you can omit using grouping all together with groupByKey? What is > >> your next step after grouping elements by key? Are you trying to reduce > >> values? If so then I would recommend using some reducing functions like > for > >> example reduceByKey or aggregateByKey. Those will first reduce value for > >> each key locally on each node before doing actual IO over the network. > There > >> will also be no grouping phase so you will not run into memory issues. > >> > >> Please let me know if that helped > >> > >> Pawel Szulc > >> @rabbitonweb > >> http://www.rabbitonweb.com > >> > >> > >> On Wed, Feb 18, 2015 at 12:06 PM, Juan Rodríguez Hortalá > >> wrote: > >>> > >>> Hi, > >>> > >>> I'm writing a Spark program where I want to divide a RDD into different > >>> groups, but the groups are too big to use groupByKey. To cope with > that, > >>> since I know in advance the list of keys for each group, I build a map > from > >>> the keys to the RDDs that result from filtering the input RDD to get > the > >>> records for the corresponding key. This works when I have a small > number of > >>> keys, but for big number of keys (tens of thousands) the execution gets > >>> stuck, without issuing any new Spark stage. I suspect the reason is > that the > >>> Spark scheduler is not able to handle so many RDDs. Does it make > sense? I'm > >>> rewriting the program to use a single RDD of pairs, with cached > partions, > >>> but I wanted to be sure I understand the problem here. > >>> > >>> Thanks a lot in advance, > >>> > >>> Greetings, > >>> > >>> Juan Rodriguez > >> > >> > > >
Re: Is there a limit to the number of RDDs in a Spark context?
At some level, enough RDDs creates hundreds of thousands of tiny partitions of data each of which creates a task for each stage. The raw overhead of all the message passing can slow things down a lot. I would not design something to use an RDD per key. You would generally use key by some value you want to divide and filter on, and then use a *ByKey to do your work. You don't work with individual RDDs this way, but usually that's good news. You usually have a lot more flexibility operating just in pure Java / Scala to do whatever you need inside your function. On Wed, Feb 18, 2015 at 2:12 PM, Juan Rodríguez Hortalá wrote: > Hi Paweł, > > Thanks a lot for your answer. I finally got the program to work by using > aggregateByKey, but I was wondering why creating thousands of RDDs doesn't > work. I think that could be interesting for using methods that work on RDDs > like for example JavaDoubleRDD.stats() ( > http://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaDoubleRDD.html#stats%28%29). > If the groups are small then I can chain groupBy(), collect(), parallelize() > and stats(), but that is quite inefficient because it implies moving data to > and from the driver, and also doesn't scale to big groups; on the other hand > if I use aggregateByKey or a similar function then I cannot use stats() so I > have to reimplement it. In general I was looking for a way to reuse other > functions that I have that work on RDDs, for using them on groups of data in > a RDD, because I don't see a how to directly apply them to each of the > groups in a pair RDD. > > Again, thanks a lot for your answer, > > Greetings, > > Juan Rodriguez > > > > > 2015-02-18 14:56 GMT+01:00 Paweł Szulc : >> >> Maybe you can omit using grouping all together with groupByKey? What is >> your next step after grouping elements by key? Are you trying to reduce >> values? If so then I would recommend using some reducing functions like for >> example reduceByKey or aggregateByKey. Those will first reduce value for >> each key locally on each node before doing actual IO over the network. There >> will also be no grouping phase so you will not run into memory issues. >> >> Please let me know if that helped >> >> Pawel Szulc >> @rabbitonweb >> http://www.rabbitonweb.com >> >> >> On Wed, Feb 18, 2015 at 12:06 PM, Juan Rodríguez Hortalá >> wrote: >>> >>> Hi, >>> >>> I'm writing a Spark program where I want to divide a RDD into different >>> groups, but the groups are too big to use groupByKey. To cope with that, >>> since I know in advance the list of keys for each group, I build a map from >>> the keys to the RDDs that result from filtering the input RDD to get the >>> records for the corresponding key. This works when I have a small number of >>> keys, but for big number of keys (tens of thousands) the execution gets >>> stuck, without issuing any new Spark stage. I suspect the reason is that the >>> Spark scheduler is not able to handle so many RDDs. Does it make sense? I'm >>> rewriting the program to use a single RDD of pairs, with cached partions, >>> but I wanted to be sure I understand the problem here. >>> >>> Thanks a lot in advance, >>> >>> Greetings, >>> >>> Juan Rodriguez >> >> > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is there a limit to the number of RDDs in a Spark context?
Hi Paweł, Thanks a lot for your answer. I finally got the program to work by using aggregateByKey, but I was wondering why creating thousands of RDDs doesn't work. I think that could be interesting for using methods that work on RDDs like for example JavaDoubleRDD.stats() ( http://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaDoubleRDD.html#stats%28%29). If the groups are small then I can chain groupBy(), collect(), parallelize() and stats(), but that is quite inefficient because it implies moving data to and from the driver, and also doesn't scale to big groups; on the other hand if I use aggregateByKey or a similar function then I cannot use stats() so I have to reimplement it. In general I was looking for a way to reuse other functions that I have that work on RDDs, for using them on groups of data in a RDD, because I don't see a how to directly apply them to each of the groups in a pair RDD. Again, thanks a lot for your answer, Greetings, Juan Rodriguez 2015-02-18 14:56 GMT+01:00 Paweł Szulc : > Maybe you can omit using grouping all together with groupByKey? What is > your next step after grouping elements by key? Are you trying to reduce > values? If so then I would recommend using some reducing functions like for > example reduceByKey or aggregateByKey. Those will first reduce value for > each key locally on each node before doing actual IO over the network. > There will also be no grouping phase so you will not run into memory issues. > > Please let me know if that helped > > Pawel Szulc > @rabbitonweb > http://www.rabbitonweb.com > > > On Wed, Feb 18, 2015 at 12:06 PM, Juan Rodríguez Hortalá < > juan.rodriguez.hort...@gmail.com> wrote: > >> Hi, >> >> I'm writing a Spark program where I want to divide a RDD into different >> groups, but the groups are too big to use groupByKey. To cope with that, >> since I know in advance the list of keys for each group, I build a map from >> the keys to the RDDs that result from filtering the input RDD to get the >> records for the corresponding key. This works when I have a small number of >> keys, but for big number of keys (tens of thousands) the execution gets >> stuck, without issuing any new Spark stage. I suspect the reason is that >> the Spark scheduler is not able to handle so many RDDs. Does it make sense? >> I'm rewriting the program to use a single RDD of pairs, with cached >> partions, but I wanted to be sure I understand the problem here. >> >> Thanks a lot in advance, >> >> Greetings, >> >> Juan Rodriguez >> > >
Re: Is there a limit to the number of RDDs in a Spark context?
Maybe you can omit using grouping all together with groupByKey? What is your next step after grouping elements by key? Are you trying to reduce values? If so then I would recommend using some reducing functions like for example reduceByKey or aggregateByKey. Those will first reduce value for each key locally on each node before doing actual IO over the network. There will also be no grouping phase so you will not run into memory issues. Please let me know if that helped Pawel Szulc @rabbitonweb http://www.rabbitonweb.com On Wed, Feb 18, 2015 at 12:06 PM, Juan Rodríguez Hortalá < juan.rodriguez.hort...@gmail.com> wrote: > Hi, > > I'm writing a Spark program where I want to divide a RDD into different > groups, but the groups are too big to use groupByKey. To cope with that, > since I know in advance the list of keys for each group, I build a map from > the keys to the RDDs that result from filtering the input RDD to get the > records for the corresponding key. This works when I have a small number of > keys, but for big number of keys (tens of thousands) the execution gets > stuck, without issuing any new Spark stage. I suspect the reason is that > the Spark scheduler is not able to handle so many RDDs. Does it make sense? > I'm rewriting the program to use a single RDD of pairs, with cached > partions, but I wanted to be sure I understand the problem here. > > Thanks a lot in advance, > > Greetings, > > Juan Rodriguez >
Re: Creating RDDs from within foreachPartition() [Spark-Streaming]
You can't use RDDs inside RDDs. RDDs are managed from the driver, and functions like foreachRDD execute things on the remote executors. You can write code to simply directly save whatever you want to ES. There is not necessarily a need to use RDDs for that. On Wed, Feb 18, 2015 at 11:36 AM, t1ny wrote: > Hi all, > > I am trying to create RDDs from within /rdd.foreachPartition()/ so I can > save these RDDs to ElasticSearch on the fly : > > stream.foreachRDD(rdd => { > rdd.foreachPartition { > iterator => { > val sc = rdd.context > iterator.foreach { > case (cid, sid, ts) => { > > [...] > > sc.makeRDD(...).saveToEs(...) <- *throws a > NullPointerException (sc is null)* > } > } > } > } > } > > Unfortunately this doesn't work as I can't seem to be able to access the > SparkContext from anywhere within /foreachPartition()/. The code above > throws a NullPointerException, but if I change it to ssc.sc.makeRDD (where > ssc is the StreamingContext object created in the main function, outside of > /foreachPartition/) then I get a NotSerializableException. > > What is the correct way to do this ? > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Creating-RDDs-from-within-foreachPartition-Spark-Streaming-tp21700.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Creating RDDs from within foreachPartition() [Spark-Streaming]
Hi all, I am trying to create RDDs from within /rdd.foreachPartition()/ so I can save these RDDs to ElasticSearch on the fly : stream.foreachRDD(rdd => { rdd.foreachPartition { iterator => { val sc = rdd.context iterator.foreach { case (cid, sid, ts) => { [...] sc.makeRDD(...).saveToEs(...) <- *throws a NullPointerException (sc is null)* } } } } } Unfortunately this doesn't work as I can't seem to be able to access the SparkContext from anywhere within /foreachPartition()/. The code above throws a NullPointerException, but if I change it to ssc.sc.makeRDD (where ssc is the StreamingContext object created in the main function, outside of /foreachPartition/) then I get a NotSerializableException. What is the correct way to do this ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Creating-RDDs-from-within-foreachPartition-Spark-Streaming-tp21700.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Is there a limit to the number of RDDs in a Spark context?
Hi, I'm writing a Spark program where I want to divide a RDD into different groups, but the groups are too big to use groupByKey. To cope with that, since I know in advance the list of keys for each group, I build a map from the keys to the RDDs that result from filtering the input RDD to get the records for the corresponding key. This works when I have a small number of keys, but for big number of keys (tens of thousands) the execution gets stuck, without issuing any new Spark stage. I suspect the reason is that the Spark scheduler is not able to handle so many RDDs. Does it make sense? I'm rewriting the program to use a single RDD of pairs, with cached partions, but I wanted to be sure I understand the problem here. Thanks a lot in advance, Greetings, Juan Rodriguez
Re: Shuffle on joining two RDDs
This will be fixed by https://github.com/apache/spark/pull/4629 On Fri, Feb 13, 2015 at 10:43 AM, Imran Rashid wrote: > yeah I thought the same thing at first too, I suggested something equivalent > w/ preservesPartitioning = true, but that isn't enough. the join is done by > union-ing the two transformed rdds, which is very different from the way it > works under the hood in scala to enable narrow dependencies. It really > needs a bigger change to pyspark. I filed this issue: > https://issues.apache.org/jira/browse/SPARK-5785 > > (and the somewhat related issue about documentation: > https://issues.apache.org/jira/browse/SPARK-5786) > > partitioning should still work in pyspark, you still need some notion of > distributing work, and the pyspark functions have a partitionFunc to decide > that. But, I am not an authority on pyspark, so perhaps there are more > holes I'm not aware of ... > > Imran > > On Fri, Feb 13, 2015 at 8:36 AM, Karlson wrote: >> >> In https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38, >> wouldn't it help to change the lines >> >> vs = rdd.map(lambda (k, v): (k, (1, v))) >> ws = other.map(lambda (k, v): (k, (2, v))) >> >> to >> >> vs = rdd.mapValues(lambda v: (1, v)) >> ws = other.mapValues(lambda v: (2, v)) >> >> ? >> As I understand, this would preserve the original partitioning. >> >> >> >> On 2015-02-13 12:43, Karlson wrote: >>> >>> Does that mean partitioning does not work in Python? Or does this only >>> effect joining? >>> >>> On 2015-02-12 19:27, Davies Liu wrote: >>>> >>>> The feature works as expected in Scala/Java, but not implemented in >>>> Python. >>>> >>>> On Thu, Feb 12, 2015 at 9:24 AM, Imran Rashid >>>> wrote: >>>>> >>>>> I wonder if the issue is that these lines just need to add >>>>> preservesPartitioning = true >>>>> ? >>>>> >>>>> https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38 >>>>> >>>>> I am getting the feeling this is an issue w/ pyspark >>>>> >>>>> >>>>> On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid >>>>> wrote: >>>>>> >>>>>> >>>>>> ah, sorry I am not too familiar w/ pyspark, sorry I missed that part. >>>>>> It >>>>>> could be that pyspark doesn't properly support narrow dependencies, or >>>>>> maybe >>>>>> you need to be more explicit about the partitioner. I am looking into >>>>>> the >>>>>> pyspark api but you might have some better guesses here than I >>>>>> thought. >>>>>> >>>>>> My suggestion to do >>>>>> >>>>>> joinedRdd.getPartitions.foreach{println} >>>>>> >>>>>> was just to see if the partition was a NarrowCoGroupSplitDep or a >>>>>> ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those >>>>>> fields >>>>>> are hidden deeper inside and are not user-visible. But I think a >>>>>> better way >>>>>> (in scala, anyway) is to look at rdd.dependencies. its a little >>>>>> tricky, >>>>>> though, you need to look deep into the lineage (example at the end). >>>>>> >>>>>> Sean -- yes it does require both RDDs have the same partitioner, but >>>>>> that >>>>>> should happen naturally if you just specify the same number of >>>>>> partitions, >>>>>> you'll get equal HashPartitioners. There is a little difference in >>>>>> the >>>>>> scala & python api that I missed here. For partitionBy in scala, you >>>>>> actually need to specify the partitioner, but not in python. However >>>>>> I >>>>>> thought it would work like groupByKey, which does just take an int. >>>>>> >>>>>> >>>>>> Here's a code example in scala -- not sure what is available from >>>>>> python. >>>>>> Hopefully somebody knows a simpler way to confirm narrow >>>>>> dependencies?? >>>>>> >>>>>>> val d = sc.parallelize(1 to 1e6.toInt).map{x => x -> >>>>>>>
Re: Columnar-Oriented RDDs
i wrote a proof of concept to automatically store any RDD of tuples or case classes in columar format using arrays (and strongly typed, so you get the benefit of primitive arrays). see: https://github.com/tresata/spark-columnar On Fri, Feb 13, 2015 at 3:06 PM, Michael Armbrust wrote: > Shark's in-memory code was ported to Spark SQL and is used by default when > you run .cache on a SchemaRDD or CACHE TABLE. > > I'd also look at parquet which is more efficient and handles nested data > better. > > On Fri, Feb 13, 2015 at 7:36 AM, Night Wolf > wrote: > >> Hi all, >> >> I'd like to build/use column oriented RDDs in some of my Spark code. A >> normal Spark RDD is stored as row oriented object if I understand >> correctly. >> >> I'd like to leverage some of the advantages of a columnar memory format. >> Shark (used to) and SparkSQL uses a columnar storage format using primitive >> arrays for each column. >> >> I'd be interested to know more about this approach and how I could build >> my own custom columnar-oriented RDD which I can use outside of Spark SQL. >> >> Could anyone give me some pointers on where to look to do something like >> this, either from scratch or using whats there in the SparkSQL libs or >> elsewhere. I know Evan Chan in a presentation made mention of building a >> custom RDD of column-oriented blocks of data. >> >> Cheers, >> ~N >> > >
Re: Columnar-Oriented RDDs
Shark's in-memory code was ported to Spark SQL and is used by default when you run .cache on a SchemaRDD or CACHE TABLE. I'd also look at parquet which is more efficient and handles nested data better. On Fri, Feb 13, 2015 at 7:36 AM, Night Wolf wrote: > Hi all, > > I'd like to build/use column oriented RDDs in some of my Spark code. A > normal Spark RDD is stored as row oriented object if I understand > correctly. > > I'd like to leverage some of the advantages of a columnar memory format. > Shark (used to) and SparkSQL uses a columnar storage format using primitive > arrays for each column. > > I'd be interested to know more about this approach and how I could build > my own custom columnar-oriented RDD which I can use outside of Spark SQL. > > Could anyone give me some pointers on where to look to do something like > this, either from scratch or using whats there in the SparkSQL libs or > elsewhere. I know Evan Chan in a presentation made mention of building a > custom RDD of column-oriented blocks of data. > > Cheers, > ~N >
Re: Shuffle on joining two RDDs
yeah I thought the same thing at first too, I suggested something equivalent w/ preservesPartitioning = true, but that isn't enough. the join is done by union-ing the two transformed rdds, which is very different from the way it works under the hood in scala to enable narrow dependencies. It really needs a bigger change to pyspark. I filed this issue: https://issues.apache.org/jira/browse/SPARK-5785 (and the somewhat related issue about documentation: https://issues.apache.org/jira/browse/SPARK-5786) partitioning should still work in pyspark, you still need some notion of distributing work, and the pyspark functions have a partitionFunc to decide that. But, I am not an authority on pyspark, so perhaps there are more holes I'm not aware of ... Imran On Fri, Feb 13, 2015 at 8:36 AM, Karlson wrote: > In https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38, > wouldn't it help to change the lines > > vs = rdd.map(lambda (k, v): (k, (1, v))) > ws = other.map(lambda (k, v): (k, (2, v))) > > to > > vs = rdd.mapValues(lambda v: (1, v)) > ws = other.mapValues(lambda v: (2, v)) > > ? > As I understand, this would preserve the original partitioning. > > > > On 2015-02-13 12:43, Karlson wrote: > >> Does that mean partitioning does not work in Python? Or does this only >> effect joining? >> >> On 2015-02-12 19:27, Davies Liu wrote: >> >>> The feature works as expected in Scala/Java, but not implemented in >>> Python. >>> >>> On Thu, Feb 12, 2015 at 9:24 AM, Imran Rashid >>> wrote: >>> >>>> I wonder if the issue is that these lines just need to add >>>> preservesPartitioning = true >>>> ? >>>> >>>> https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38 >>>> >>>> I am getting the feeling this is an issue w/ pyspark >>>> >>>> >>>> On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid >>>> wrote: >>>> >>>>> >>>>> ah, sorry I am not too familiar w/ pyspark, sorry I missed that part. >>>>> It >>>>> could be that pyspark doesn't properly support narrow dependencies, or >>>>> maybe >>>>> you need to be more explicit about the partitioner. I am looking into >>>>> the >>>>> pyspark api but you might have some better guesses here than I thought. >>>>> >>>>> My suggestion to do >>>>> >>>>> joinedRdd.getPartitions.foreach{println} >>>>> >>>>> was just to see if the partition was a NarrowCoGroupSplitDep or a >>>>> ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those >>>>> fields >>>>> are hidden deeper inside and are not user-visible. But I think a >>>>> better way >>>>> (in scala, anyway) is to look at rdd.dependencies. its a little >>>>> tricky, >>>>> though, you need to look deep into the lineage (example at the end). >>>>> >>>>> Sean -- yes it does require both RDDs have the same partitioner, but >>>>> that >>>>> should happen naturally if you just specify the same number of >>>>> partitions, >>>>> you'll get equal HashPartitioners. There is a little difference in the >>>>> scala & python api that I missed here. For partitionBy in scala, you >>>>> actually need to specify the partitioner, but not in python. However I >>>>> thought it would work like groupByKey, which does just take an int. >>>>> >>>>> >>>>> Here's a code example in scala -- not sure what is available from >>>>> python. >>>>> Hopefully somebody knows a simpler way to confirm narrow dependencies?? >>>>> >>>>> val d = sc.parallelize(1 to 1e6.toInt).map{x => x -> x}.groupByKey(64) >>>>>> val d2 = sc.parallelize(3 to 1e6.toInt).map{x => x -> >>>>>> x}.groupByKey(64) >>>>>> scala> d.partitioner == d2.partitioner >>>>>> res2: Boolean = true >>>>>> val joined = d.join(d2) >>>>>> val d3 = sc.parallelize(3 to 1e6.toInt).map{x => x -> >>>>>> x}.groupByKey(100) >>>>>> val badJoined = d.join(d3) >>>>>> >>>>>> d.setName("d") >>>>>> d2.setName("d2") >>>>>> d3.setName("d3") >
Columnar-Oriented RDDs
Hi all, I'd like to build/use column oriented RDDs in some of my Spark code. A normal Spark RDD is stored as row oriented object if I understand correctly. I'd like to leverage some of the advantages of a columnar memory format. Shark (used to) and SparkSQL uses a columnar storage format using primitive arrays for each column. I'd be interested to know more about this approach and how I could build my own custom columnar-oriented RDD which I can use outside of Spark SQL. Could anyone give me some pointers on where to look to do something like this, either from scratch or using whats there in the SparkSQL libs or elsewhere. I know Evan Chan in a presentation made mention of building a custom RDD of column-oriented blocks of data. Cheers, ~N
Re: Shuffle on joining two RDDs
In https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38, wouldn't it help to change the lines vs = rdd.map(lambda (k, v): (k, (1, v))) ws = other.map(lambda (k, v): (k, (2, v))) to vs = rdd.mapValues(lambda v: (1, v)) ws = other.mapValues(lambda v: (2, v)) ? As I understand, this would preserve the original partitioning. On 2015-02-13 12:43, Karlson wrote: Does that mean partitioning does not work in Python? Or does this only effect joining? On 2015-02-12 19:27, Davies Liu wrote: The feature works as expected in Scala/Java, but not implemented in Python. On Thu, Feb 12, 2015 at 9:24 AM, Imran Rashid wrote: I wonder if the issue is that these lines just need to add preservesPartitioning = true ? https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38 I am getting the feeling this is an issue w/ pyspark On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid wrote: ah, sorry I am not too familiar w/ pyspark, sorry I missed that part. It could be that pyspark doesn't properly support narrow dependencies, or maybe you need to be more explicit about the partitioner. I am looking into the pyspark api but you might have some better guesses here than I thought. My suggestion to do joinedRdd.getPartitions.foreach{println} was just to see if the partition was a NarrowCoGroupSplitDep or a ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those fields are hidden deeper inside and are not user-visible. But I think a better way (in scala, anyway) is to look at rdd.dependencies. its a little tricky, though, you need to look deep into the lineage (example at the end). Sean -- yes it does require both RDDs have the same partitioner, but that should happen naturally if you just specify the same number of partitions, you'll get equal HashPartitioners. There is a little difference in the scala & python api that I missed here. For partitionBy in scala, you actually need to specify the partitioner, but not in python. However I thought it would work like groupByKey, which does just take an int. Here's a code example in scala -- not sure what is available from python. Hopefully somebody knows a simpler way to confirm narrow dependencies?? val d = sc.parallelize(1 to 1e6.toInt).map{x => x -> x}.groupByKey(64) val d2 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(64) scala> d.partitioner == d2.partitioner res2: Boolean = true val joined = d.join(d2) val d3 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(100) val badJoined = d.join(d3) d.setName("d") d2.setName("d2") d3.setName("d3") joined.setName("joined") badJoined.setName("badJoined") //unfortunatley, just looking at the immediate dependencies of joined & badJoined is misleading, b/c join actually creates // one more step after the shuffle scala> joined.dependencies res20: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@74751ac8) //even with the join that does require a shuffle, we still see a OneToOneDependency, but thats just a simple flatMap step scala> badJoined.dependencies res21: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@1cf356cc) //so lets make a helper function to get all the dependencies recursively def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = { val deps = rdd.dependencies deps.map{rdd -> _} ++ deps.flatMap{dep => flattenDeps(dep.rdd)} } //full dependencies of the good join scala> flattenDeps(joined).foreach{println} (joined FlatMappedValuesRDD[9] at join at :16,org.apache.spark.OneToOneDependency@74751ac8) (MappedValuesRDD[8] at join at :16,org.apache.spark.OneToOneDependency@623264af) (CoGroupedRDD[7] at join at :16,org.apache.spark.OneToOneDependency@5a704f86) (CoGroupedRDD[7] at join at :16,org.apache.spark.OneToOneDependency@37514cd) (d ShuffledRDD[3] at groupByKey at :12,org.apache.spark.ShuffleDependency@7ba8a080) (MappedRDD[2] at map at :12,org.apache.spark.OneToOneDependency@7bc172ec) (d2 ShuffledRDD[6] at groupByKey at :12,org.apache.spark.ShuffleDependency@5960236d) (MappedRDD[5] at map at :12,org.apache.spark.OneToOneDependency@36b5f6f2) //full dependencies of the bad join -- notice the ShuffleDependency! scala> flattenDeps(badJoined).foreach{println} (badJoined FlatMappedValuesRDD[15] at join at :16,org.apache.spark.OneToOneDependency@1cf356cc) (MappedValuesRDD[14] at join at :16,org.apache.spark.OneToOneDependency@5dea4db) (CoGroupedRDD[13] at join at :16,org.apache.spark.ShuffleDependency@5c1928df) (CoGroupedRDD[13] at join at :16,org.apache.spark.OneToOneDependency@77ca77b5) (d ShuffledRDD[3] at groupByKey at :12,org.apache.spark.ShuffleDependency@7ba8a080) (MappedRDD[2] at map at :12,org.apache.spark.OneToOneDependency@7bc172ec) (d3 ShuffledRDD[12] at groupByKey at :12,org.apache.spark.
Re: Shuffle on joining two RDDs
Does that mean partitioning does not work in Python? Or does this only effect joining? On 2015-02-12 19:27, Davies Liu wrote: The feature works as expected in Scala/Java, but not implemented in Python. On Thu, Feb 12, 2015 at 9:24 AM, Imran Rashid wrote: I wonder if the issue is that these lines just need to add preservesPartitioning = true ? https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38 I am getting the feeling this is an issue w/ pyspark On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid wrote: ah, sorry I am not too familiar w/ pyspark, sorry I missed that part. It could be that pyspark doesn't properly support narrow dependencies, or maybe you need to be more explicit about the partitioner. I am looking into the pyspark api but you might have some better guesses here than I thought. My suggestion to do joinedRdd.getPartitions.foreach{println} was just to see if the partition was a NarrowCoGroupSplitDep or a ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those fields are hidden deeper inside and are not user-visible. But I think a better way (in scala, anyway) is to look at rdd.dependencies. its a little tricky, though, you need to look deep into the lineage (example at the end). Sean -- yes it does require both RDDs have the same partitioner, but that should happen naturally if you just specify the same number of partitions, you'll get equal HashPartitioners. There is a little difference in the scala & python api that I missed here. For partitionBy in scala, you actually need to specify the partitioner, but not in python. However I thought it would work like groupByKey, which does just take an int. Here's a code example in scala -- not sure what is available from python. Hopefully somebody knows a simpler way to confirm narrow dependencies?? val d = sc.parallelize(1 to 1e6.toInt).map{x => x -> x}.groupByKey(64) val d2 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(64) scala> d.partitioner == d2.partitioner res2: Boolean = true val joined = d.join(d2) val d3 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(100) val badJoined = d.join(d3) d.setName("d") d2.setName("d2") d3.setName("d3") joined.setName("joined") badJoined.setName("badJoined") //unfortunatley, just looking at the immediate dependencies of joined & badJoined is misleading, b/c join actually creates // one more step after the shuffle scala> joined.dependencies res20: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@74751ac8) //even with the join that does require a shuffle, we still see a OneToOneDependency, but thats just a simple flatMap step scala> badJoined.dependencies res21: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@1cf356cc) //so lets make a helper function to get all the dependencies recursively def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = { val deps = rdd.dependencies deps.map{rdd -> _} ++ deps.flatMap{dep => flattenDeps(dep.rdd)} } //full dependencies of the good join scala> flattenDeps(joined).foreach{println} (joined FlatMappedValuesRDD[9] at join at :16,org.apache.spark.OneToOneDependency@74751ac8) (MappedValuesRDD[8] at join at :16,org.apache.spark.OneToOneDependency@623264af) (CoGroupedRDD[7] at join at :16,org.apache.spark.OneToOneDependency@5a704f86) (CoGroupedRDD[7] at join at :16,org.apache.spark.OneToOneDependency@37514cd) (d ShuffledRDD[3] at groupByKey at :12,org.apache.spark.ShuffleDependency@7ba8a080) (MappedRDD[2] at map at :12,org.apache.spark.OneToOneDependency@7bc172ec) (d2 ShuffledRDD[6] at groupByKey at :12,org.apache.spark.ShuffleDependency@5960236d) (MappedRDD[5] at map at :12,org.apache.spark.OneToOneDependency@36b5f6f2) //full dependencies of the bad join -- notice the ShuffleDependency! scala> flattenDeps(badJoined).foreach{println} (badJoined FlatMappedValuesRDD[15] at join at :16,org.apache.spark.OneToOneDependency@1cf356cc) (MappedValuesRDD[14] at join at :16,org.apache.spark.OneToOneDependency@5dea4db) (CoGroupedRDD[13] at join at :16,org.apache.spark.ShuffleDependency@5c1928df) (CoGroupedRDD[13] at join at :16,org.apache.spark.OneToOneDependency@77ca77b5) (d ShuffledRDD[3] at groupByKey at :12,org.apache.spark.ShuffleDependency@7ba8a080) (MappedRDD[2] at map at :12,org.apache.spark.OneToOneDependency@7bc172ec) (d3 ShuffledRDD[12] at groupByKey at :12,org.apache.spark.ShuffleDependency@d794984) (MappedRDD[11] at map at :12,org.apache.spark.OneToOneDependency@15c98005) On Thu, Feb 12, 2015 at 10:05 AM, Karlson wrote: Hi Imran, thanks for your quick reply. Actually I am doing this: rddA = rddA.partitionBy(n).cache() rddB = rddB.partitionBy(n).cache() followed by rddA.count() rddB.count() then joinedRDD = rddA.join(rddB) I thought that the
Re: Shuffle on joining two RDDs
The feature works as expected in Scala/Java, but not implemented in Python. On Thu, Feb 12, 2015 at 9:24 AM, Imran Rashid wrote: > I wonder if the issue is that these lines just need to add > preservesPartitioning = true > ? > > https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38 > > I am getting the feeling this is an issue w/ pyspark > > > On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid wrote: >> >> ah, sorry I am not too familiar w/ pyspark, sorry I missed that part. It >> could be that pyspark doesn't properly support narrow dependencies, or maybe >> you need to be more explicit about the partitioner. I am looking into the >> pyspark api but you might have some better guesses here than I thought. >> >> My suggestion to do >> >> joinedRdd.getPartitions.foreach{println} >> >> was just to see if the partition was a NarrowCoGroupSplitDep or a >> ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those fields >> are hidden deeper inside and are not user-visible. But I think a better way >> (in scala, anyway) is to look at rdd.dependencies. its a little tricky, >> though, you need to look deep into the lineage (example at the end). >> >> Sean -- yes it does require both RDDs have the same partitioner, but that >> should happen naturally if you just specify the same number of partitions, >> you'll get equal HashPartitioners. There is a little difference in the >> scala & python api that I missed here. For partitionBy in scala, you >> actually need to specify the partitioner, but not in python. However I >> thought it would work like groupByKey, which does just take an int. >> >> >> Here's a code example in scala -- not sure what is available from python. >> Hopefully somebody knows a simpler way to confirm narrow dependencies?? >> >>> val d = sc.parallelize(1 to 1e6.toInt).map{x => x -> x}.groupByKey(64) >>> val d2 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(64) >>> scala> d.partitioner == d2.partitioner >>> res2: Boolean = true >>> val joined = d.join(d2) >>> val d3 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(100) >>> val badJoined = d.join(d3) >>> >>> d.setName("d") >>> d2.setName("d2") >>> d3.setName("d3") >>> joined.setName("joined") >>> badJoined.setName("badJoined") >>> >>> >>> //unfortunatley, just looking at the immediate dependencies of joined & >>> badJoined is misleading, b/c join actually creates >>> // one more step after the shuffle >>> scala> joined.dependencies >>> res20: Seq[org.apache.spark.Dependency[_]] = >>> List(org.apache.spark.OneToOneDependency@74751ac8) >>> //even with the join that does require a shuffle, we still see a >>> OneToOneDependency, but thats just a simple flatMap step >>> scala> badJoined.dependencies >>> res21: Seq[org.apache.spark.Dependency[_]] = >>> List(org.apache.spark.OneToOneDependency@1cf356cc) >> >> >> >>> >>> //so lets make a helper function to get all the dependencies recursively >>> >>> def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = { >>> val deps = rdd.dependencies >>> deps.map{rdd -> _} ++ deps.flatMap{dep => flattenDeps(dep.rdd)} >>> } >>> >>> >>> //full dependencies of the good join >>> >>> scala> flattenDeps(joined).foreach{println} >>> (joined FlatMappedValuesRDD[9] at join at >>> :16,org.apache.spark.OneToOneDependency@74751ac8) >>> (MappedValuesRDD[8] at join at >>> :16,org.apache.spark.OneToOneDependency@623264af) >>> (CoGroupedRDD[7] at join at >>> :16,org.apache.spark.OneToOneDependency@5a704f86) >>> (CoGroupedRDD[7] at join at >>> :16,org.apache.spark.OneToOneDependency@37514cd) >>> (d ShuffledRDD[3] at groupByKey at >>> :12,org.apache.spark.ShuffleDependency@7ba8a080) >>> (MappedRDD[2] at map at >>> :12,org.apache.spark.OneToOneDependency@7bc172ec) >>> (d2 ShuffledRDD[6] at groupByKey at >>> :12,org.apache.spark.ShuffleDependency@5960236d) >>> (MappedRDD[5] at map at >>> :12,org.apache.spark.OneToOneDependency@36b5f6f2) >>> >>> >>> >>> //full dependencies of the bad join -- notice the ShuffleDependency! >>> >>> scala> flattenDeps(badJoined).foreach{println} >>> (badJoined FlatMappedValuesRDD[15] a
Re: Shuffle on joining two RDDs
I wonder if the issue is that these lines just need to add preservesPartitioning = true ? https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38 I am getting the feeling this is an issue w/ pyspark On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid wrote: > ah, sorry I am not too familiar w/ pyspark, sorry I missed that part. It > could be that pyspark doesn't properly support narrow dependencies, or > maybe you need to be more explicit about the partitioner. I am looking > into the pyspark api but you might have some better guesses here than I > thought. > > My suggestion to do > > joinedRdd.getPartitions.foreach{println} > > was just to see if the partition was a NarrowCoGroupSplitDep or a > ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those > fields are hidden deeper inside and are not user-visible. But I think a > better way (in scala, anyway) is to look at rdd.dependencies. its a little > tricky, though, you need to look deep into the lineage (example at the end). > > Sean -- yes it does require both RDDs have the same partitioner, but that > should happen naturally if you just specify the same number of partitions, > you'll get equal HashPartitioners. There is a little difference in the > scala & python api that I missed here. For partitionBy in scala, you > actually need to specify the partitioner, but not in python. However I > thought it would work like groupByKey, which does just take an int. > > > Here's a code example in scala -- not sure what is available from python. > Hopefully somebody knows a simpler way to confirm narrow dependencies?? > > val d = sc.parallelize(1 to 1e6.toInt).map{x => x -> x}.groupByKey(64) >> val d2 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(64) >> scala> d.partitioner == d2.partitioner >> res2: Boolean = true >> val joined = d.join(d2) >> val d3 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(100) >> val badJoined = d.join(d3) >> >> d.setName("d") >> d2.setName("d2") >> d3.setName("d3") >> joined.setName("joined") >> badJoined.setName("badJoined") > > >> //unfortunatley, just looking at the immediate dependencies of joined & >> badJoined is misleading, b/c join actually creates >> // one more step after the shuffle >> scala> joined.dependencies >> res20: Seq[org.apache.spark.Dependency[_]] = >> List(org.apache.spark.OneToOneDependency@74751ac8) >> //even with the join that does require a shuffle, we still see a >> OneToOneDependency, but thats just a simple flatMap step >> scala> badJoined.dependencies >> res21: Seq[org.apache.spark.Dependency[_]] = >> List(org.apache.spark.OneToOneDependency@1cf356cc) >> > > > >> //so lets make a helper function to get all the dependencies recursively >> > def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = { >> val deps = rdd.dependencies >> deps.map{rdd -> _} ++ deps.flatMap{dep => flattenDeps(dep.rdd)} >> } >> >> >> //full dependencies of the good join > > scala> flattenDeps(joined).foreach{println} >> (joined FlatMappedValuesRDD[9] at join at >> :16,org.apache.spark.OneToOneDependency@74751ac8) >> (MappedValuesRDD[8] at join at >> :16,org.apache.spark.OneToOneDependency@623264af) >> >> *(CoGroupedRDD[7] at join at >> :16,org.apache.spark.OneToOneDependency@5a704f86)*(CoGroupedRDD[7] >> at join at :16,org.apache.spark.OneToOneDependency@37514cd) >> (d ShuffledRDD[3] at groupByKey at >> :12,org.apache.spark.ShuffleDependency@7ba8a080) >> (MappedRDD[2] at map at >> :12,org.apache.spark.OneToOneDependency@7bc172ec) >> (d2 ShuffledRDD[6] at groupByKey at >> :12,org.apache.spark.ShuffleDependency@5960236d) >> (MappedRDD[5] at map at >> :12,org.apache.spark.OneToOneDependency@36b5f6f2) >> >> > >> //full dependencies of the bad join -- notice the ShuffleDependency! > > scala> flattenDeps(badJoined).foreach{println} >> (badJoined FlatMappedValuesRDD[15] at join at >> :16,org.apache.spark.OneToOneDependency@1cf356cc) >> (MappedValuesRDD[14] at join at >> :16,org.apache.spark.OneToOneDependency@5dea4db) >> >> *(CoGroupedRDD[13] at join at >> :16,org.apache.spark.ShuffleDependency@5c1928df)*(CoGroupedRDD[13] >> at join at :16,org.apache.spark.OneToOneDependency@77ca77b5) >> (d ShuffledRDD[3] at groupByKey at >> :12,org.apache.spark.ShuffleDependency@7ba8a080) >> (MappedRDD[2] at map at >> :12,org.apache.spark.OneToOneDependency@7bc172ec) >> (d3 Shuffl
Re: Shuffle on joining two RDDs
ah, sorry I am not too familiar w/ pyspark, sorry I missed that part. It could be that pyspark doesn't properly support narrow dependencies, or maybe you need to be more explicit about the partitioner. I am looking into the pyspark api but you might have some better guesses here than I thought. My suggestion to do joinedRdd.getPartitions.foreach{println} was just to see if the partition was a NarrowCoGroupSplitDep or a ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those fields are hidden deeper inside and are not user-visible. But I think a better way (in scala, anyway) is to look at rdd.dependencies. its a little tricky, though, you need to look deep into the lineage (example at the end). Sean -- yes it does require both RDDs have the same partitioner, but that should happen naturally if you just specify the same number of partitions, you'll get equal HashPartitioners. There is a little difference in the scala & python api that I missed here. For partitionBy in scala, you actually need to specify the partitioner, but not in python. However I thought it would work like groupByKey, which does just take an int. Here's a code example in scala -- not sure what is available from python. Hopefully somebody knows a simpler way to confirm narrow dependencies?? val d = sc.parallelize(1 to 1e6.toInt).map{x => x -> x}.groupByKey(64) > val d2 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(64) > scala> d.partitioner == d2.partitioner > res2: Boolean = true > val joined = d.join(d2) > val d3 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(100) > val badJoined = d.join(d3) > > d.setName("d") > d2.setName("d2") > d3.setName("d3") > joined.setName("joined") > badJoined.setName("badJoined") > //unfortunatley, just looking at the immediate dependencies of joined & > badJoined is misleading, b/c join actually creates > // one more step after the shuffle > scala> joined.dependencies > res20: Seq[org.apache.spark.Dependency[_]] = > List(org.apache.spark.OneToOneDependency@74751ac8) > //even with the join that does require a shuffle, we still see a > OneToOneDependency, but thats just a simple flatMap step > scala> badJoined.dependencies > res21: Seq[org.apache.spark.Dependency[_]] = > List(org.apache.spark.OneToOneDependency@1cf356cc) > > //so lets make a helper function to get all the dependencies recursively > def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = { > val deps = rdd.dependencies > deps.map{rdd -> _} ++ deps.flatMap{dep => flattenDeps(dep.rdd)} > } > > > //full dependencies of the good join scala> flattenDeps(joined).foreach{println} > (joined FlatMappedValuesRDD[9] at join at > :16,org.apache.spark.OneToOneDependency@74751ac8) > (MappedValuesRDD[8] at join at > :16,org.apache.spark.OneToOneDependency@623264af) > > *(CoGroupedRDD[7] at join at > :16,org.apache.spark.OneToOneDependency@5a704f86)*(CoGroupedRDD[7] > at join at :16,org.apache.spark.OneToOneDependency@37514cd) > (d ShuffledRDD[3] at groupByKey at > :12,org.apache.spark.ShuffleDependency@7ba8a080) > (MappedRDD[2] at map at > :12,org.apache.spark.OneToOneDependency@7bc172ec) > (d2 ShuffledRDD[6] at groupByKey at > :12,org.apache.spark.ShuffleDependency@5960236d) > (MappedRDD[5] at map at > :12,org.apache.spark.OneToOneDependency@36b5f6f2) > > > //full dependencies of the bad join -- notice the ShuffleDependency! scala> flattenDeps(badJoined).foreach{println} > (badJoined FlatMappedValuesRDD[15] at join at > :16,org.apache.spark.OneToOneDependency@1cf356cc) > (MappedValuesRDD[14] at join at > :16,org.apache.spark.OneToOneDependency@5dea4db) > > *(CoGroupedRDD[13] at join at > :16,org.apache.spark.ShuffleDependency@5c1928df)*(CoGroupedRDD[13] > at join at :16,org.apache.spark.OneToOneDependency@77ca77b5) > (d ShuffledRDD[3] at groupByKey at > :12,org.apache.spark.ShuffleDependency@7ba8a080) > (MappedRDD[2] at map at > :12,org.apache.spark.OneToOneDependency@7bc172ec) > (d3 ShuffledRDD[12] at groupByKey at > :12,org.apache.spark.ShuffleDependency@d794984) > (MappedRDD[11] at map at > :12,org.apache.spark.OneToOneDependency@15c98005) On Thu, Feb 12, 2015 at 10:05 AM, Karlson wrote: > Hi Imran, > > thanks for your quick reply. > > Actually I am doing this: > > rddA = rddA.partitionBy(n).cache() > rddB = rddB.partitionBy(n).cache() > > followed by > > rddA.count() > rddB.count() > > then joinedRDD = rddA.join(rddB) > > I thought that the count() would force the evaluation, so any subsequent > joins would be shuffleless. I was wrong about the shuffle amounts however. > The shuffle write is
Re: Shuffle on joining two RDDs
Hi, I believe that partitionBy will use the same (default) partitioner on both RDDs. On 2015-02-12 17:12, Sean Owen wrote: Doesn't this require that both RDDs have the same partitioner? On Thu, Feb 12, 2015 at 3:48 PM, Imran Rashid wrote: Hi Karlson, I think your assumptions are correct -- that join alone shouldn't require any shuffling. But its possible you are getting tripped up by lazy evaluation of RDDs. After you do your partitionBy, are you sure those RDDs are actually materialized & cached somewhere? eg., if you just did this: val rddA = someData.partitionBy(N) val rddB = someOtherData.partitionBy(N) val joinedRdd = rddA.join(rddB) joinedRdd.count() //or any other action then the partitioning isn't actually getting run until you do the join. So though the join itself can happen without partitioning, joinedRdd.count() will trigger the evaluation of rddA & rddB which will require shuffles. Note that even if you have some intervening action on rddA & rddB that shuffles them, unless you persist the result, you will need to reshuffle them for the join. If this doesn't help explain things, for debugging joinedRdd.getPartitions.foreach{println} this is getting into the weeds, but at least this will tell us whether or not you are getting narrow dependencies, which would avoid the shuffle. (Does anyone know of a simpler way to check this?) hope this helps, Imran On Thu, Feb 12, 2015 at 9:25 AM, Karlson wrote: Hi All, using Pyspark, I create two RDDs (one with about 2M records (~200MB), the other with about 8M records (~2GB)) of the format (key, value). I've done a partitionBy(num_partitions) on both RDDs and verified that both RDDs have the same number of partitions and that equal keys reside on the same partition (via mapPartitionsWithIndex). Now I'd expect that for a join on the two RDDs no shuffling is necessary. Looking at the Web UI under http://driver:4040 however reveals that that assumption is false. In fact I am seeing shuffle writes of about 200MB and reads of about 50MB. What's the explanation for that behaviour? Where am I wrong with my assumption? Thanks in advance, Karlson - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Shuffle on joining two RDDs
Doesn't this require that both RDDs have the same partitioner? On Thu, Feb 12, 2015 at 3:48 PM, Imran Rashid wrote: > Hi Karlson, > > I think your assumptions are correct -- that join alone shouldn't require > any shuffling. But its possible you are getting tripped up by lazy > evaluation of RDDs. After you do your partitionBy, are you sure those RDDs > are actually materialized & cached somewhere? eg., if you just did this: > > val rddA = someData.partitionBy(N) > val rddB = someOtherData.partitionBy(N) > val joinedRdd = rddA.join(rddB) > joinedRdd.count() //or any other action > > then the partitioning isn't actually getting run until you do the join. So > though the join itself can happen without partitioning, joinedRdd.count() > will trigger the evaluation of rddA & rddB which will require shuffles. > Note that even if you have some intervening action on rddA & rddB that > shuffles them, unless you persist the result, you will need to reshuffle > them for the join. > > If this doesn't help explain things, for debugging > > joinedRdd.getPartitions.foreach{println} > > this is getting into the weeds, but at least this will tell us whether or > not you are getting narrow dependencies, which would avoid the shuffle. > (Does anyone know of a simpler way to check this?) > > hope this helps, > Imran > > > > > On Thu, Feb 12, 2015 at 9:25 AM, Karlson wrote: >> >> Hi All, >> >> using Pyspark, I create two RDDs (one with about 2M records (~200MB), the >> other with about 8M records (~2GB)) of the format (key, value). >> >> I've done a partitionBy(num_partitions) on both RDDs and verified that >> both RDDs have the same number of partitions and that equal keys reside on >> the same partition (via mapPartitionsWithIndex). >> >> Now I'd expect that for a join on the two RDDs no shuffling is necessary. >> Looking at the Web UI under http://driver:4040 however reveals that that >> assumption is false. >> >> In fact I am seeing shuffle writes of about 200MB and reads of about 50MB. >> >> What's the explanation for that behaviour? Where am I wrong with my >> assumption? >> >> Thanks in advance, >> >> Karlson >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Shuffle on joining two RDDs
Hi Imran, thanks for your quick reply. Actually I am doing this: rddA = rddA.partitionBy(n).cache() rddB = rddB.partitionBy(n).cache() followed by rddA.count() rddB.count() then joinedRDD = rddA.join(rddB) I thought that the count() would force the evaluation, so any subsequent joins would be shuffleless. I was wrong about the shuffle amounts however. The shuffle write is actually 2GB (i.e. the size of the bigger RDD) whil there is no Shuffle read. A joinedRdd.count() does a shuffle read of about 1GB in size, though. The getPartitions-method does not exist on the resulting RDD (I am using the Python API). There is however foreachPartition(). What is the line joinedRdd.getPartitions.foreach{println} supposed to do? Thank you, Karlson PS: Sorry for sending this twice, I accidentally did not reply to the mailing list first. On 2015-02-12 16:48, Imran Rashid wrote: Hi Karlson, I think your assumptions are correct -- that join alone shouldn't require any shuffling. But its possible you are getting tripped up by lazy evaluation of RDDs. After you do your partitionBy, are you sure those RDDs are actually materialized & cached somewhere? eg., if you just did this: val rddA = someData.partitionBy(N) val rddB = someOtherData.partitionBy(N) val joinedRdd = rddA.join(rddB) joinedRdd.count() //or any other action then the partitioning isn't actually getting run until you do the join. So though the join itself can happen without partitioning, joinedRdd.count() will trigger the evaluation of rddA & rddB which will require shuffles. Note that even if you have some intervening action on rddA & rddB that shuffles them, unless you persist the result, you will need to reshuffle them for the join. If this doesn't help explain things, for debugging joinedRdd.getPartitions.foreach{println} this is getting into the weeds, but at least this will tell us whether or not you are getting narrow dependencies, which would avoid the shuffle. (Does anyone know of a simpler way to check this?) hope this helps, Imran On Thu, Feb 12, 2015 at 9:25 AM, Karlson wrote: Hi All, using Pyspark, I create two RDDs (one with about 2M records (~200MB), the other with about 8M records (~2GB)) of the format (key, value). I've done a partitionBy(num_partitions) on both RDDs and verified that both RDDs have the same number of partitions and that equal keys reside on the same partition (via mapPartitionsWithIndex). Now I'd expect that for a join on the two RDDs no shuffling is necessary. Looking at the Web UI under http://driver:4040 however reveals that that assumption is false. In fact I am seeing shuffle writes of about 200MB and reads of about 50MB. What's the explanation for that behaviour? Where am I wrong with my assumption? Thanks in advance, Karlson - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Shuffle on joining two RDDs
Hi Karlson, I think your assumptions are correct -- that join alone shouldn't require any shuffling. But its possible you are getting tripped up by lazy evaluation of RDDs. After you do your partitionBy, are you sure those RDDs are actually materialized & cached somewhere? eg., if you just did this: val rddA = someData.partitionBy(N) val rddB = someOtherData.partitionBy(N) val joinedRdd = rddA.join(rddB) joinedRdd.count() //or any other action then the partitioning isn't actually getting run until you do the join. So though the join itself can happen without partitioning, joinedRdd.count() will trigger the evaluation of rddA & rddB which will require shuffles. Note that even if you have some intervening action on rddA & rddB that shuffles them, unless you persist the result, you will need to reshuffle them for the join. If this doesn't help explain things, for debugging joinedRdd.getPartitions.foreach{println} this is getting into the weeds, but at least this will tell us whether or not you are getting narrow dependencies, which would avoid the shuffle. (Does anyone know of a simpler way to check this?) hope this helps, Imran On Thu, Feb 12, 2015 at 9:25 AM, Karlson wrote: > Hi All, > > using Pyspark, I create two RDDs (one with about 2M records (~200MB), the > other with about 8M records (~2GB)) of the format (key, value). > > I've done a partitionBy(num_partitions) on both RDDs and verified that > both RDDs have the same number of partitions and that equal keys reside on > the same partition (via mapPartitionsWithIndex). > > Now I'd expect that for a join on the two RDDs no shuffling is necessary. > Looking at the Web UI under http://driver:4040 however reveals that that > assumption is false. > > In fact I am seeing shuffle writes of about 200MB and reads of about 50MB. > > What's the explanation for that behaviour? Where am I wrong with my > assumption? > > Thanks in advance, > > Karlson > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Shuffle on joining two RDDs
Hi All, using Pyspark, I create two RDDs (one with about 2M records (~200MB), the other with about 8M records (~2GB)) of the format (key, value). I've done a partitionBy(num_partitions) on both RDDs and verified that both RDDs have the same number of partitions and that equal keys reside on the same partition (via mapPartitionsWithIndex). Now I'd expect that for a join on the two RDDs no shuffling is necessary. Looking at the Web UI under http://driver:4040 however reveals that that assumption is false. In fact I am seeing shuffle writes of about 200MB and reads of about 50MB. What's the explanation for that behaviour? Where am I wrong with my assumption? Thanks in advance, Karlson - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Zipping RDDs of equal size not possible
Hi Xiangrui, I'm sorry. I didn't recognize your mail. What I did is a workaround only working for my special case. It does not scale and only works for small data sets but that is fine for me so far. Kind Regards, Niklas def securlyZipRdds[A, B: ClassTag](rdd1: RDD[A], rdd2: RDD[B]): RDD[(A, B)] = { val rdd1Repartitioned = rdd1.repartition(1) val rdd2Repartitioned = rdd2.repartition(1) val (rdd1Balanced, rdd2Balanced) = balanceRddSizes(rdd1Repartitioned, rdd2Repartitioned) rdd1Balanced.zip(rdd2Balanced) } def balanceRddSizes[A, B](rdd1: RDD[A], rdd2: RDD[B]): (RDD[A], RDD[B]) = { val rdd1count = rdd1.count() val rdd2count = rdd2.count() val difference = math.abs(rdd1count - rdd2count).toInt if (rdd1count > rdd2count) { (removeRandomElements(rdd1, difference), rdd2) } else if (rdd2count > rdd1count) { (rdd1, removeRandomElements(rdd2, difference)) } else { (rdd1, rdd2) } } def removeRandomElements[A](rdd: RDD[A], numberOfElements: Int): RDD[A] = { val sample: Array[A] = rdd.takeSample(false, numberOfElements) val set: Set[A] = Set(sample: _*) rdd.filter(x => if (set.contains(x)) false else true) } On 10.01.2015 06:56, Xiangrui Meng wrote: > "sample 2 * n tuples, split them into two parts, balance the sizes of > these parts by filtering some tuples out" > > How do you guarantee that the two RDDs have the same size? > > -Xiangrui > > On Fri, Jan 9, 2015 at 3:40 AM, Niklas Wilcke > <1wil...@informatik.uni-hamburg.de> wrote: >> Hi Spark community, >> >> I have a problem with zipping two RDDs of the same size and same number of >> partitions. >> The error message says that zipping is only allowed on RDDs which are >> partitioned into chunks of exactly the same sizes. >> How can I assure this? My workaround at the moment is to repartition both >> RDDs to only one partition but that obviously >> does not scale. >> >> This problem originates from my problem to draw n random tuple pairs (Tuple, >> Tuple) from an RDD[Tuple]. >> What I do is to sample 2 * n tuples, split them into two parts, balance the >> sizes of these parts >> by filtering some tuples out and zipping them together. >> >> I would appreciate to read better approaches for both problems. >> >> Thanks in advance, >> Niklas - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: New combination-like RDD based on two RDDs
Problem solved. A simple join will do the work val prefix = new PairRDDFunctions[Int, Set[Int]](sc.parallelize(List((9, Set(4)), (1,Set(3)), (2,Set(5)), (2,Set(4) val suffix = sc.parallelize(List((1, Set(1)), (2, Set(6)), (2, Set(5)), (2, Set(7 prefix.join(suffix).collect().foreach(println) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/New-combination-like-RDD-based-on-two-RDDs-tp21508p21511.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: New combination-like RDD based on two RDDs
You should use join: val rdd1 = sc.parallelize(List((1,(3)), (2,(5)), (3,(6 val rdd2 = sc.parallelize(List((2,(1)), (2,(3)), (3,(9 rdd1.join(rdd2).collect res0: Array[(Int, (Int, Int))] = Array((2,(5,1)), (2,(5,3)), (3,(6,9))) Please see my cheat sheet at * 3.14 join(otherDataset, [numTasks])* http://www.openkb.info/2015/01/scala-on-spark-cheatsheet.html On Wed, Feb 4, 2015 at 3:52 PM, dash wrote: > Hey Spark gurus! Sorry for the confusing title. I do not know the exactly > description of my problem, if you know please tell me so I can change it > :-) > > Say I have two RDDs right now, and they are > > val rdd1 = sc.parallelize(List((1,(3)), (2,(5)), (3,(6 > val rdd2 = sc.parallelize(List((2,(1)), (2,(3)), (3,(9 > > I want combine rdd1 and rdd2 to get rdd3 which looks like > > List((1,(3)), (2,(5,1)), (2,(5,3)), (3, (6,9))) > > The order in _._2 does not matter, so you can treat it as a Set. > > I tried to use zip, but since there is no guarantee that the length of rdd1 > and rdd2 will be the same I do not know if it is doable. > > Also I looked into PairedRDD, some people use union operation on two RDDs > and then apply a map function on it. Since I want all combinations > according > to _._1, I do not know how to achieve it by union and map. > > Thanks in advance! > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/New-combination-like-RDD-based-on-two-RDDs-tp21508.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- Thanks, www.openkb.info (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)
New combination-like RDD based on two RDDs
Hey Spark gurus! Sorry for the confusing title. I do not know the exactly description of my problem, if you know please tell me so I can change it :-) Say I have two RDDs right now, and they are val rdd1 = sc.parallelize(List((1,(3)), (2,(5)), (3,(6 val rdd2 = sc.parallelize(List((2,(1)), (2,(3)), (3,(9 I want combine rdd1 and rdd2 to get rdd3 which looks like List((1,(3)), (2,(5,1)), (2,(5,3)), (3, (6,9))) The order in _._2 does not matter, so you can treat it as a Set. I tried to use zip, but since there is no guarantee that the length of rdd1 and rdd2 will be the same I do not know if it is doable. Also I looked into PairedRDD, some people use union operation on two RDDs and then apply a map function on it. Since I want all combinations according to _._1, I do not know how to achieve it by union and map. Thanks in advance! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/New-combination-like-RDD-based-on-two-RDDs-tp21508.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Joining piped RDDs
Hi! We have a common usecase with Spark - we go out to some database, e.g. Cassandra, crunch though all of its data, but along the RDD pipeline we use a pipe operator to some script. All the data before the pipe has some unique IDs, but inside the pipe everything is lost. The only current solution we have is to format the data into the pipe, so it includes the ids, and then restore it all in a map after the pipe. However it would be much nicer if we could just join/zip back the output of the pipe. However we can’t cache the RDDs, so it would be nice to have a forkRDD of some sort that only keeps the last partition in cache (since we’re guaranteed that there’ll be a zip later on and the dataflow will be synchronized). Or maybe we can already do this in Spark? Thank you, Pavel Velikhov Chief Science Officer TopRater - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Trying to find where Spark persists RDDs when run with YARN
These will be under the working directory of the YARN container running the executor. I don't have it handy but think it will also be a "spark-local" or similar directory. On Sun, Jan 18, 2015 at 2:50 PM, Hemanth Yamijala wrote: > Hi, > > I am trying to find where Spark persists RDDs when we call the persist() api > and executed under YARN. This is purely for understanding... > > In my driver program, I wait indefinitely, so as to avoid any clean up > problems. > > In the actual job, I roughly do the following: > > JavaRDD lines = context.textFile(args[0]); > lines.persist(StorageLevel.DISK_ONLY()); > lines.collect(); > > When run with local executor, I can see that the files (like rdd_1_0) are > persisted under directories like > /var/folders/mt/51srrjc15wl3n829qkgnh2dmgp/T/spark-local-20150118201458-6147/15. > > Where similarly can I find these under Yarn ? > > Thanks > hemanth - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Trying to find where Spark persists RDDs when run with YARN
Hi, I am trying to find where Spark persists RDDs when we call the persist() api and executed under YARN. This is purely for understanding... In my driver program, I wait indefinitely, so as to avoid any clean up problems. In the actual job, I roughly do the following: JavaRDD lines = context.textFile(args[0]); lines.persist(StorageLevel.DISK_ONLY()); lines.collect(); When run with local executor, I can see that the files (like rdd_1_0) are persisted under directories like /var/folders/mt/51srrjc15wl3n829qkgnh2dmgp/T/spark-local-20150118201458-6147/15. Where similarly can I find these under Yarn ? Thanks hemanth
Re: Zipping RDDs of equal size not possible
"sample 2 * n tuples, split them into two parts, balance the sizes of these parts by filtering some tuples out" How do you guarantee that the two RDDs have the same size? -Xiangrui On Fri, Jan 9, 2015 at 3:40 AM, Niklas Wilcke <1wil...@informatik.uni-hamburg.de> wrote: > Hi Spark community, > > I have a problem with zipping two RDDs of the same size and same number of > partitions. > The error message says that zipping is only allowed on RDDs which are > partitioned into chunks of exactly the same sizes. > How can I assure this? My workaround at the moment is to repartition both > RDDs to only one partition but that obviously > does not scale. > > This problem originates from my problem to draw n random tuple pairs (Tuple, > Tuple) from an RDD[Tuple]. > What I do is to sample 2 * n tuples, split them into two parts, balance the > sizes of these parts > by filtering some tuples out and zipping them together. > > I would appreciate to read better approaches for both problems. > > Thanks in advance, > Niklas - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Zipping RDDs of equal size not possible
Hi Spark community, I have a problem with zipping two RDDs of the same size and same number of partitions. The error message says that zipping is only allowed on RDDs which are partitioned into chunks of exactly the same sizes. How can I assure this? My workaround at the moment is to repartition both RDDs to only one partition but that obviously does not scale. This problem originates from my problem to draw n random tuple pairs (Tuple, Tuple) from an RDD[Tuple]. What I do is to sample 2 * n tuples, split them into two parts, balance the sizes of these parts by filtering some tuples out and zipping them together. I would appreciate to read better approaches for both problems. Thanks in advance, Niklas
Re: Join RDDs with DStreams
Here's how you do it: val joined_stream = *myStream*.transform((x: RDD[(String, String)]) => { val prdd = new PairRDDFunctions[String, String](x) prdd.join(*myRDD*)}) Thanks Best Regards On Thu, Jan 8, 2015 at 10:20 PM, Asim Jalis wrote: > Is there a way to join non-DStream RDDs with DStream RDDs? > > Here is the use case. I have a lookup table stored in HDFS that I want to > read as an RDD. Then I want to join it with the RDDs that are coming in > through the DStream. How can I do this? > > Thanks. > > Asim >
Re: Join RDDs with DStreams
You are looking for dstream.transform(rdd => rdd.(otherRdd)) The docs contain an example on how to use transform. https://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations-on-dstreams -kr, Gerard. On Thu, Jan 8, 2015 at 5:50 PM, Asim Jalis wrote: > Is there a way to join non-DStream RDDs with DStream RDDs? > > Here is the use case. I have a lookup table stored in HDFS that I want to > read as an RDD. Then I want to join it with the RDDs that are coming in > through the DStream. How can I do this? > > Thanks. > > Asim >
Join RDDs with DStreams
Is there a way to join non-DStream RDDs with DStream RDDs? Here is the use case. I have a lookup table stored in HDFS that I want to read as an RDD. Then I want to join it with the RDDs that are coming in through the DStream. How can I do this? Thanks. Asim
Re: Strange DAG scheduling behavior on currently dependent RDDs
I asked this question too soon. I am caching off a bunch of RDDs in a TrieMap so that our framework can wire them together and the locking was not completely correct- therefore it was creating multiple new RDDs at times instead of using cached versions- which were creating completely separate lineages. What's strange is that this bug only surfaced when I updated Spark. On Wed, Jan 7, 2015 at 9:12 AM, Corey Nolet wrote: > We just updated to Spark 1.2.0 from Spark 1.1.0. We have a small framework > that we've been developing that connects various different RDDs together > based on some predefined business cases. After updating to 1.2.0, some of > the concurrency expectations about how the stages within jobs are executed > have changed quite significantly. > > Given 3 RDDs: > > RDD1 = inputFromHDFS().groupBy().sortBy().etc().cache() > RDD2 = RDD1.outputToFile > RDD3 = RDD1.groupBy().outputToFile > > In Spark 1.1.0, we expected RDD1 to be scheduled based on the first stage > encountered (RDD2's outputToFile or RDD3's groupBy()) and then for RDD2 and > RDD3 to both block waiting for RDD1 to complete and cache- at which point > RDD2 and RDD3 both use the cached version to complete their work. > > Spark 1.2.0 seems to schedule two (be it concurrently running) stages for > each of RDD1's stages (inputFromHDFS, groupBy(), sortBy(), etc() will each > get run twice). It does not look like there is any sharing of the results > between these jobs. > > Are we doing something wrong? Is there a setting that I'm not > understanding somewhere? >
Re: How to merge a RDD of RDDs into one uber RDD
Yup, i meant union only. On Wed, Jan 7, 2015, 16:19 Sean Owen wrote: > I think you mean union(). Yes, you could also simply make an RDD for each > file, and use SparkContext.union() to put them together. > > On Wed, Jan 7, 2015 at 9:51 AM, Raghavendra Pandey < > raghavendra.pan...@gmail.com> wrote: > >> You can also use join function of rdd. This is actually kind of append >> funtion that add up all the rdds and create one uber rdd. >> >> On Wed, Jan 7, 2015, 14:30 rkgurram wrote: >> >>> Thank you for the response, sure will try that out. >>> >>> Currently I changed my code such that the first map "files.map" to >>> "files.flatMap", which I guess will do similar what you are saying, it >>> gives >>> me a List[] of elements (in this case LabeledPoints, I could also do >>> RDDs) >>> which I then turned into a mega RDD. The current problem seems to be >>> gone, I >>> no longer get the NPE but further down I am getting a indexOutOfBounds, >>> so >>> trying to figure out if the original problem is manifesting itself as a >>> new >>> one. >>> >>> >>> Regards >>> -Ravi >>> >>> >>> >>> >>> -- >>> View this message in context: http://apache-spark-user-list. >>> 1001560.n3.nabble.com/How-to-merge-a-RDD-of-RDDs-into-one- >>> uber-RDD-tp20986p21012.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> - >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >
Strange DAG scheduling behavior on currently dependent RDDs
We just updated to Spark 1.2.0 from Spark 1.1.0. We have a small framework that we've been developing that connects various different RDDs together based on some predefined business cases. After updating to 1.2.0, some of the concurrency expectations about how the stages within jobs are executed have changed quite significantly. Given 3 RDDs: RDD1 = inputFromHDFS().groupBy().sortBy().etc().cache() RDD2 = RDD1.outputToFile RDD3 = RDD1.groupBy().outputToFile In Spark 1.1.0, we expected RDD1 to be scheduled based on the first stage encountered (RDD2's outputToFile or RDD3's groupBy()) and then for RDD2 and RDD3 to both block waiting for RDD1 to complete and cache- at which point RDD2 and RDD3 both use the cached version to complete their work. Spark 1.2.0 seems to schedule two (be it concurrently running) stages for each of RDD1's stages (inputFromHDFS, groupBy(), sortBy(), etc() will each get run twice). It does not look like there is any sharing of the results between these jobs. Are we doing something wrong? Is there a setting that I'm not understanding somewhere?
Re: How to merge a RDD of RDDs into one uber RDD
I think you mean union(). Yes, you could also simply make an RDD for each file, and use SparkContext.union() to put them together. On Wed, Jan 7, 2015 at 9:51 AM, Raghavendra Pandey < raghavendra.pan...@gmail.com> wrote: > You can also use join function of rdd. This is actually kind of append > funtion that add up all the rdds and create one uber rdd. > > On Wed, Jan 7, 2015, 14:30 rkgurram wrote: > >> Thank you for the response, sure will try that out. >> >> Currently I changed my code such that the first map "files.map" to >> "files.flatMap", which I guess will do similar what you are saying, it >> gives >> me a List[] of elements (in this case LabeledPoints, I could also do RDDs) >> which I then turned into a mega RDD. The current problem seems to be >> gone, I >> no longer get the NPE but further down I am getting a indexOutOfBounds, so >> trying to figure out if the original problem is manifesting itself as a >> new >> one. >> >> >> Regards >> -Ravi >> >> >> >> >> -- >> View this message in context: http://apache-spark-user-list. >> 1001560.n3.nabble.com/How-to-merge-a-RDD-of-RDDs-into-one- >> uber-RDD-tp20986p21012.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >>
Re: How to merge a RDD of RDDs into one uber RDD
You can also use join function of rdd. This is actually kind of append funtion that add up all the rdds and create one uber rdd. On Wed, Jan 7, 2015, 14:30 rkgurram wrote: > Thank you for the response, sure will try that out. > > Currently I changed my code such that the first map "files.map" to > "files.flatMap", which I guess will do similar what you are saying, it > gives > me a List[] of elements (in this case LabeledPoints, I could also do RDDs) > which I then turned into a mega RDD. The current problem seems to be gone, > I > no longer get the NPE but further down I am getting a indexOutOfBounds, so > trying to figure out if the original problem is manifesting itself as a new > one. > > > Regards > -Ravi > > > > > -- > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/How-to-merge-a-RDD-of-RDDs-into-one- > uber-RDD-tp20986p21012.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: How to merge a RDD of RDDs into one uber RDD
Thank you for the response, sure will try that out. Currently I changed my code such that the first map "files.map" to "files.flatMap", which I guess will do similar what you are saying, it gives me a List[] of elements (in this case LabeledPoints, I could also do RDDs) which I then turned into a mega RDD. The current problem seems to be gone, I no longer get the NPE but further down I am getting a indexOutOfBounds, so trying to figure out if the original problem is manifesting itself as a new one. Regards -Ravi -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-merge-a-RDD-of-RDDs-into-one-uber-RDD-tp20986p21012.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to merge a RDD of RDDs into one uber RDD
an RDD cannot contain elements of type RDD. (i.e. you can't nest RDDs within RDDs, in fact, I don't think it makes any sense) I suggest rather than having an RDD of file names, collect those file name strings back on to the driver as a Scala array of file names, and then from there, make an array of RDDs from which you can fold over them and merge them. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-merge-a-RDD-of-RDDs-into-one-uber-RDD-tp20986p21007.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Cannot see RDDs in Spark UI
Hi Manoj, I've noticed that the storage tab only shows RDDs that have been cached. Did you call .cache() or .persist() on any of the RDDs? Andrew On Tue, Jan 6, 2015 at 6:48 PM, Manoj Samel wrote: > Hi, > > I create a bunch of RDDs, including schema RDDs. When I run the program > and go to UI on xxx:4040, the storage tab does not shows any RDDs. > > > Spark version is 1.1.1 (Hadoop 2.3) > > Any thoughts? > > Thanks, >
Cannot see RDDs in Spark UI
Hi, I create a bunch of RDDs, including schema RDDs. When I run the program and go to UI on xxx:4040, the storage tab does not shows any RDDs. Spark version is 1.1.1 (Hadoop 2.3) Any thoughts? Thanks,
Exception after changing RDDs
Hi All, We are getting exception after we added one RDD to another RDD. We first declared an empty RDD "A", then received new Dstream "B" from Kafka; for each RDD in the Dstream "B", we kept adding them to the existing RDD "A". Error happened when we were trying to use the updated RDD "A". Could anybody give some hints about what's going wrong here ? Thanks ! *Sample Code :* val sparkConf = new SparkConf().setMaster("local[*]").setAppName(“newAPP") val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint("checkpoint") ///Declare an empty RDD for A/ @volatile var rddA: RDD[A] = ssc.sparkContext.emptyRDD[A] / //Receives A and B data from Kafka/ val kafkaMessages = KafkaUtils.createStream(ssc, "localhost:2181", "CONSUMERGROUP", Map(“TOPIC" -> 1)).map(_._2)…. val dstreamA: DStream[A] = kafkaMessages.map(msg => JSONConverter.fromJSON[A](msg)).flatMap(_.toOption)… val dstreamB: DStream[B] = kafkaMessages.map(msg => JSONConverter.fromJSON[B](msg)).flatMap(_.toOption)… / //Updates RDD A from DStream A/ dstreamA.foreachRDD(rdd => rddA = rddA ++ rdd ) / //Joins DStream B with existing RDD / val results : Dstream[(A,B)] = dstreamB.transform(rdd => { val id_A: RDD[(String, A)] = rddA.keyBy(_.id) val id_B: RDD[(String, B)] = rdd.keyBy(_.id) val rddA_B: RDD[(A, B)] = id_A.join(id_B).map { case (id, (a, b)) => (a, b) } rddA_B }) >From my observation, the exception happens when we were doing the rddA.keyBy(_.id) *Exception Output :* 14/12/23 15:35:23 WARN storage.BlockManager: Block input-0-1419377723400 already exists on this machine; not re-adding it 14/12/23 15:35:24 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/12/23 15:35:34 WARN storage.BlockManager: Block input-0-1419377734600 already exists on this machine; not re-adding it 14/12/23 15:35:37 ERROR scheduler.DAGSchedulerActorSupervisor: eventProcesserActor failed; shutting down SparkContext org.apache.spark.SparkException: Attempted to use BlockRDD[5] at BlockRDD at ReceiverInputDStream.scala:69 after its blocks have been removed! at org.apache.spark.rdd.BlockRDD.assertValid(BlockRDD.scala:83) at org.apache.spark.rdd.BlockRDD.getPreferredLocations(BlockRDD.scala:56) at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:216) at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:216) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:215) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1297) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1307) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1306) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1306) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1306) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1304) at scala.collection.immutable.List.foreach(List.scala:318) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Exception-after-changing-RDDs-tp20841.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: UNION two RDDs
Hi Sean and Madhu, Thank you for the explanation. I really appreciate it. Best Regards, Jerry On Fri, Dec 19, 2014 at 4:50 AM, Sean Owen wrote: > coalesce actually changes the number of partitions. Unless the > original RDD had just 1 partition, coalesce(1) will make an RDD with 1 > partition that is larger than the original partitions, of course. > > I don't think the question is about ordering of things within an > element of the RDD? > > If the original RDD was sorted, and so has a defined ordering, then it > will be preserved. Otherwise I believe you do not have any guarantees > about ordering. In practice, you may find that you still encounter the > elements in the same order after coalesce(1), although I am not sure > that is even true. > > union() is the same story; unless the RDDs are sorted I don't think > there are guarantees. However I'm almost certain that in practice, as > it happens now, A's elements would come before B's after a union, if > you did traverse them. > > On Fri, Dec 19, 2014 at 5:41 AM, madhu phatak > wrote: > > Hi, > > coalesce is an operation which changes no of records in a partition. It > will > > not touch ordering with in a row AFAIK. > > > > On Fri, Dec 19, 2014 at 2:22 AM, Jerry Lam wrote: > >> > >> Hi Spark users, > >> > >> I wonder if val resultRDD = RDDA.union(RDDB) will always have records in > >> RDDA before records in RDDB. > >> > >> Also, will resultRDD.coalesce(1) change this ordering? > >> > >> Best Regards, > >> > >> Jerry > > > > > > > > -- > > Regards, > > Madhukara Phatak > > http://www.madhukaraphatak.com >
Re: spark-shell bug with RDDs and case classes?
AFAIK it's a known issue of some sort in the Scala REPL, which is what the Spark REPL is. The PR that was closed was just adding tests to show it's a bug. I don't know if there is any workaround now. On Fri, Dec 19, 2014 at 7:21 PM, Jay Hutfles wrote: > Found a problem in the spark-shell, but can't confirm that it's related to > open issues on Spark's JIRA page. I was wondering if anyone could help > identify if this is an issue or if it's already being addressed. > > Test: (in spark-shell) > case class Person(name: String, age: Int) > val peopleList = List(Person("Alice", 35), Person("Bob", 47), > Person("Alice", 35), Person("Bob", 15)) > val peopleRDD = sc.parallelize(peopleList) > assert(peopleList.distinct.size == peopleRDD.distinct.count) > > > At first I thought it was related to issue SPARK-2620 > (https://issues.apache.org/jira/browse/SPARK-2620), which says case classes > can't be used as keys in spark-shell due to how case classes are compiled by > the REPL. It lists .reduceByKey, .groupByKey and .distinct as being > affected. But the associated pull request for adding tests to cover this > (https://github.com/apache/spark/pull/1588) was closed. > > Is this something I just have to live with when using the REPL? Or is this > covered by something bigger that's being addressed? > > Thanks in advance > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/spark-shell-bug-with-RDDs-and-case-classes-tp20789.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark-shell bug with RDDs and case classes?
Found a problem in the spark-shell, but can't confirm that it's related to open issues on Spark's JIRA page. I was wondering if anyone could help identify if this is an issue or if it's already being addressed. Test: (in spark-shell) case class Person(name: String, age: Int) val peopleList = List(Person("Alice", 35), Person("Bob", 47), Person("Alice", 35), Person("Bob", 15)) val peopleRDD = sc.parallelize(peopleList) assert(peopleList.distinct.size == peopleRDD.distinct.count) At first I thought it was related to issue SPARK-2620 (https://issues.apache.org/jira/browse/SPARK-2620), which says case classes can't be used as keys in spark-shell due to how case classes are compiled by the REPL. It lists .reduceByKey, .groupByKey and .distinct as being affected. But the associated pull request for adding tests to cover this (https://github.com/apache/spark/pull/1588) was closed. Is this something I just have to live with when using the REPL? Or is this covered by something bigger that's being addressed? Thanks in advance -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-shell-bug-with-RDDs-and-case-classes-tp20789.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: UNION two RDDs
coalesce actually changes the number of partitions. Unless the original RDD had just 1 partition, coalesce(1) will make an RDD with 1 partition that is larger than the original partitions, of course. I don't think the question is about ordering of things within an element of the RDD? If the original RDD was sorted, and so has a defined ordering, then it will be preserved. Otherwise I believe you do not have any guarantees about ordering. In practice, you may find that you still encounter the elements in the same order after coalesce(1), although I am not sure that is even true. union() is the same story; unless the RDDs are sorted I don't think there are guarantees. However I'm almost certain that in practice, as it happens now, A's elements would come before B's after a union, if you did traverse them. On Fri, Dec 19, 2014 at 5:41 AM, madhu phatak wrote: > Hi, > coalesce is an operation which changes no of records in a partition. It will > not touch ordering with in a row AFAIK. > > On Fri, Dec 19, 2014 at 2:22 AM, Jerry Lam wrote: >> >> Hi Spark users, >> >> I wonder if val resultRDD = RDDA.union(RDDB) will always have records in >> RDDA before records in RDDB. >> >> Also, will resultRDD.coalesce(1) change this ordering? >> >> Best Regards, >> >> Jerry > > > > -- > Regards, > Madhukara Phatak > http://www.madhukaraphatak.com - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: UNION two RDDs
Hi, coalesce is an operation which changes no of records in a partition. It will not touch ordering with in a row AFAIK. On Fri, Dec 19, 2014 at 2:22 AM, Jerry Lam wrote: > > Hi Spark users, > > I wonder if val resultRDD = RDDA.union(RDDB) will always have records in > RDDA before records in RDDB. > > Also, will resultRDD.coalesce(1) change this ordering? > > Best Regards, > > Jerry > -- Regards, Madhukara Phatak http://www.madhukaraphatak.com
UNION two RDDs
Hi Spark users, I wonder if val resultRDD = RDDA.union(RDDB) will always have records in RDDA before records in RDDB. Also, will resultRDD.coalesce(1) change this ordering? Best Regards, Jerry
Re: RDDs being cleaned too fast
RDD.persist() can be useful here. On 11 December 2014 at 14:34, ankits [via Apache Spark User List] < ml-node+s1001560n20613...@n3.nabble.com> wrote: > > I'm using spark 1.1.0 and am seeing persisted RDDs being cleaned up too > fast. How can i inspect the size of RDD in memory and get more information > about why it was cleaned up. There should be more than enough memory > available on the cluster to store them, and by default, the > spark.cleaner.ttl is infinite, so I want more information about why this is > happening and how to prevent it. > > Spark just logs this when removing RDDs: > > [2014-12-11 01:19:34,006] INFO spark.storage.BlockManager [] [] - > Removing RDD 33 > [2014-12-11 01:19:34,010] INFO pache.spark.ContextCleaner [] > [akka://JobServer/user/context-supervisor/job-context1] - Cleaned RDD 33 > [2014-12-11 01:19:34,012] INFO spark.storage.BlockManager [] [] - > Removing RDD 33 > [2014-12-11 01:19:34,016] INFO pache.spark.ContextCleaner [] > [akka://JobServer/user/context-supervisor/job-context1] - Cleaned RDD 33 > > -- > If you reply to this email, your message will be added to the discussion > below: > > http://apache-spark-user-list.1001560.n3.nabble.com/RDDs-being-cleaned-too-fast-tp20613.html > To start a new topic under Apache Spark User List, email > ml-node+s1001560n1...@n3.nabble.com > To unsubscribe from Apache Spark User List, click here > <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=aG5haGFrQHd5bnlhcmRncm91cC5jb218MXwtMTgxOTE5MTkyOQ==> > . > NAML > <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> > -- Regards, Harihar Nahak BigData Developer Wynyard Email:hna...@wynyardgroup.com | Extn: 8019 - --Harihar -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDDs-being-cleaned-too-fast-tp20613p20738.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: RDDs being cleaned too fast
I was having similar issues with my persistent RDDs. After some digging around, I noticed that the partitions were not balanced evenly across the available nodes. After a "repartition", the RDD was spread evenly across all available memory. Not sure if that is something that would help your use-case though. You could also increase the spark.storage.memoryFraction if that is an option. - Ranga On Wed, Dec 10, 2014 at 10:23 PM, Aaron Davidson wrote: > The ContextCleaner uncaches RDDs that have gone out of scope on the > driver. So it's possible that the given RDD is no longer reachable in your > program's control flow, or else it'd be a bug in the ContextCleaner. > > On Wed, Dec 10, 2014 at 5:34 PM, ankits wrote: > >> I'm using spark 1.1.0 and am seeing persisted RDDs being cleaned up too >> fast. >> How can i inspect the size of RDD in memory and get more information about >> why it was cleaned up. There should be more than enough memory available >> on >> the cluster to store them, and by default, the spark.cleaner.ttl is >> infinite, so I want more information about why this is happening and how >> to >> prevent it. >> >> Spark just logs this when removing RDDs: >> >> [2014-12-11 01:19:34,006] INFO spark.storage.BlockManager [] [] - >> Removing >> RDD 33 >> [2014-12-11 01:19:34,010] INFO pache.spark.ContextCleaner [] >> [akka://JobServer/user/context-supervisor/job-context1] - Cleaned RDD 33 >> [2014-12-11 01:19:34,012] INFO spark.storage.BlockManager [] [] - >> Removing >> RDD 33 >> [2014-12-11 01:19:34,016] INFO pache.spark.ContextCleaner [] >> [akka://JobServer/user/context-supervisor/job-context1] - Cleaned RDD 33 >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/RDDs-being-cleaned-too-fast-tp20613.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >
Re: Locking for shared RDDs
Aditya, I think you have the mental model of spark streaming a little off the mark. Unlike traditional streaming systems, where any kind of state is mutable, SparkStreaming is designed on Sparks immutable RDDs. Streaming data is received and divided into immutable blocks, then form immutable RDDs, and then transformations form new immutable RDDs. Its best that you first read the Spark paper and then the Spark Streaming paper to under the model. Once you understand that, you will realize that since everything is immutable, the question of consistency does not even arise :) TD On Mon, Dec 8, 2014 at 9:44 PM, Raghavendra Pandey wrote: > You don't need to worry about locks as such as one thread/worker is > responsible exclusively for one partition of the RDD. You can use > Accumulator variables that spark provides to get the state updates. > > > On Mon Dec 08 2014 at 8:14:28 PM aditya.athalye > wrote: >> >> I am relatively new to Spark. I am planning to use Spark Streaming for my >> OLAP use case, but I would like to know how RDDs are shared between >> multiple >> workers. >> If I need to constantly compute some stats on the streaming data, >> presumably >> shared state would have to updated serially by different spark workers. Is >> this managed by Spark automatically or does the application need to ensure >> distributed locks are acquired? >> >> Thanks >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Locking-for-shared-RDDs-tp20578.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDDs being cleaned too fast
The ContextCleaner uncaches RDDs that have gone out of scope on the driver. So it's possible that the given RDD is no longer reachable in your program's control flow, or else it'd be a bug in the ContextCleaner. On Wed, Dec 10, 2014 at 5:34 PM, ankits wrote: > I'm using spark 1.1.0 and am seeing persisted RDDs being cleaned up too > fast. > How can i inspect the size of RDD in memory and get more information about > why it was cleaned up. There should be more than enough memory available on > the cluster to store them, and by default, the spark.cleaner.ttl is > infinite, so I want more information about why this is happening and how to > prevent it. > > Spark just logs this when removing RDDs: > > [2014-12-11 01:19:34,006] INFO spark.storage.BlockManager [] [] - Removing > RDD 33 > [2014-12-11 01:19:34,010] INFO pache.spark.ContextCleaner [] > [akka://JobServer/user/context-supervisor/job-context1] - Cleaned RDD 33 > [2014-12-11 01:19:34,012] INFO spark.storage.BlockManager [] [] - Removing > RDD 33 > [2014-12-11 01:19:34,016] INFO pache.spark.ContextCleaner [] > [akka://JobServer/user/context-supervisor/job-context1] - Cleaned RDD 33 > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/RDDs-being-cleaned-too-fast-tp20613.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
RDDs being cleaned too fast
I'm using spark 1.1.0 and am seeing persisted RDDs being cleaned up too fast. How can i inspect the size of RDD in memory and get more information about why it was cleaned up. There should be more than enough memory available on the cluster to store them, and by default, the spark.cleaner.ttl is infinite, so I want more information about why this is happening and how to prevent it. Spark just logs this when removing RDDs: [2014-12-11 01:19:34,006] INFO spark.storage.BlockManager [] [] - Removing RDD 33 [2014-12-11 01:19:34,010] INFO pache.spark.ContextCleaner [] [akka://JobServer/user/context-supervisor/job-context1] - Cleaned RDD 33 [2014-12-11 01:19:34,012] INFO spark.storage.BlockManager [] [] - Removing RDD 33 [2014-12-11 01:19:34,016] INFO pache.spark.ContextCleaner [] [akka://JobServer/user/context-supervisor/job-context1] - Cleaned RDD 33 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDDs-being-cleaned-too-fast-tp20613.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Caching RDDs with shared memory - bug or feature?
If all RDD elements within a partition contain pointers to a single shared object, Spark persists as expected when the RDD is small. However, if the RDD is more than *200 elements* then Spark reports requiring much more memory than it actually does. This becomes a problem for large RDDs, as Spark refuses to persist even though it can. Is this a bug or is there a feature that I'm missing? Cheers, Luke *val* /n/ = ??? *class* Elem(*val* s:Array[Int]) *val* /rdd/ = /sc/.parallelize(/Seq/(1)).mapPartitions( _ => { *val* sharedArray = Array./ofDim/[Int](1000) /// Should require ~40MB/ (1 to /n/).toIterator.map(_ => *new* Elem(sharedArray)) }).cache().count() /// force computation/ For n = 100: /MemoryStore: Block rdd_1_0 stored as values in memory (estimated size *38.1 MB*, free 898.7 MB)/ For n = 200: /MemoryStore: Block rdd_1_0 stored as values in memory (estimated size *38.2 MB*, free 898.7 MB)/ For n = 201: /MemoryStore: Block rdd_1_0 stored as values in memory (estimated size *76.7 MB*, free 860.2 MB)/ For n = 5000: /MemoryStore: *Not enough space to cache rdd_1_0 in memory!* (computed 781.3 MB so far)/ Note: For medium sized n (where n>200 but spark can still cache), the actual application memory still stays where it should - Spark just seems to vastly overreport how much memory it's using. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Caching-RDDs-with-shared-memory-bug-or-feature-tp20596.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Locking for shared RDDs
You don't need to worry about locks as such as one thread/worker is responsible exclusively for one partition of the RDD. You can use Accumulator variables that spark provides to get the state updates. On Mon Dec 08 2014 at 8:14:28 PM aditya.athalye wrote: > I am relatively new to Spark. I am planning to use Spark Streaming for my > OLAP use case, but I would like to know how RDDs are shared between > multiple > workers. > If I need to constantly compute some stats on the streaming data, > presumably > shared state would have to updated serially by different spark workers. Is > this managed by Spark automatically or does the application need to ensure > distributed locks are acquired? > > Thanks > > > > -- > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/Locking-for-shared-RDDs-tp20578.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Locking for shared RDDs
I am relatively new to Spark. I am planning to use Spark Streaming for my OLAP use case, but I would like to know how RDDs are shared between multiple workers. If I need to constantly compute some stats on the streaming data, presumably shared state would have to updated serially by different spark workers. Is this managed by Spark automatically or does the application need to ensure distributed locks are acquired? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Locking-for-shared-RDDs-tp20578.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running two different Spark jobs vs multi-threading RDDs
You can actually submit multiple jobs to a single SparkContext in different threads. In the case you mentioned with 2 stages having a common parent, both will wait for the parent stage to complete and then the two will execute in parallel, sharing the cluster resources. Solutions that submit multiple applications are also reasonable, but then you have to manage the job dependencies yourself. On Sat, Dec 6, 2014 at 8:41 AM, Corey Nolet wrote: > Reading the documentation a little more closely, I'm using the wrong > terminology. I'm using stages to refer to what spark is calling a job. I > guess application (more than one spark context) is what I'm asking about > On Dec 5, 2014 5:19 PM, "Corey Nolet" wrote: > >> I've read in the documentation that RDDs can be run concurrently when >> submitted in separate threads. I'm curious how the scheduler would handle >> propagating these down to the tasks. >> >> I have 3 RDDs: >> - one RDD which loads some initial data, transforms it and caches it >> - two RDDs which use the cached RDD to provide reports >> >> I'm trying to figure out how the resources will be scheduled to perform >> these stages if I were to concurrently run the two RDDs that depend on the >> first RDD. Would the two RDDs run sequentially? Will they both run @ the >> same time and be smart about how they are caching? >> >> Would this be a time when I'd want to use Tachyon instead and run this as >> 2 separate physical jobs: one to place the shared data in the RAMDISK and >> one to run the two dependent RDDs concurrently? Or would it even be best in >> that case to run 3 completely separate jobs? >> >> We're planning on using YARN so there's 2 levels of scheduling going on. >> We're trying to figure out the best way to utilize the resources so that we >> are fully saturating the system and making sure there's constantly work >> being done rather than anything spinning gears waiting on upstream >> processing to occur (in mapreduce, we'd just submit a ton of jobs and have >> them wait in line). >> >
Re: Running two different Spark jobs vs multi-threading RDDs
Reading the documentation a little more closely, I'm using the wrong terminology. I'm using stages to refer to what spark is calling a job. I guess application (more than one spark context) is what I'm asking about On Dec 5, 2014 5:19 PM, "Corey Nolet" wrote: > I've read in the documentation that RDDs can be run concurrently when > submitted in separate threads. I'm curious how the scheduler would handle > propagating these down to the tasks. > > I have 3 RDDs: > - one RDD which loads some initial data, transforms it and caches it > - two RDDs which use the cached RDD to provide reports > > I'm trying to figure out how the resources will be scheduled to perform > these stages if I were to concurrently run the two RDDs that depend on the > first RDD. Would the two RDDs run sequentially? Will they both run @ the > same time and be smart about how they are caching? > > Would this be a time when I'd want to use Tachyon instead and run this as > 2 separate physical jobs: one to place the shared data in the RAMDISK and > one to run the two dependent RDDs concurrently? Or would it even be best in > that case to run 3 completely separate jobs? > > We're planning on using YARN so there's 2 levels of scheduling going on. > We're trying to figure out the best way to utilize the resources so that we > are fully saturating the system and making sure there's constantly work > being done rather than anything spinning gears waiting on upstream > processing to occur (in mapreduce, we'd just submit a ton of jobs and have > them wait in line). >
Running two different Spark jobs vs multi-threading RDDs
I've read in the documentation that RDDs can be run concurrently when submitted in separate threads. I'm curious how the scheduler would handle propagating these down to the tasks. I have 3 RDDs: - one RDD which loads some initial data, transforms it and caches it - two RDDs which use the cached RDD to provide reports I'm trying to figure out how the resources will be scheduled to perform these stages if I were to concurrently run the two RDDs that depend on the first RDD. Would the two RDDs run sequentially? Will they both run @ the same time and be smart about how they are caching? Would this be a time when I'd want to use Tachyon instead and run this as 2 separate physical jobs: one to place the shared data in the RAMDISK and one to run the two dependent RDDs concurrently? Or would it even be best in that case to run 3 completely separate jobs? We're planning on using YARN so there's 2 levels of scheduling going on. We're trying to figure out the best way to utilize the resources so that we are fully saturating the system and making sure there's constantly work being done rather than anything spinning gears waiting on upstream processing to occur (in mapreduce, we'd just submit a ton of jobs and have them wait in line).
RE: Determination of number of RDDs
Regarding: Can we create such an array and then parallelize it? Parallelizing an array of RDDs -> i.e. RDD[RDD[x]] is not possible. RDD is not serializable. From: Deep Pradhan [mailto:pradhandeep1...@gmail.com] Sent: 04 December 2014 15:39 To: user@spark.apache.org Subject: Determination of number of RDDs Hi, I have a graph and I want to create RDDs equal in number to the nodes in the graph. How can I do that? If I have 10 nodes then I want to create 10 rdds. Is that possible in GraphX? Like in C language we have array of pointers. Do we have array of RDDs in Spark. Can we create such an array and then parallelize it? Thank You
Re: Determination of number of RDDs
At 2014-12-04 02:08:45 -0800, Deep Pradhan wrote: > I have a graph and I want to create RDDs equal in number to the nodes in > the graph. How can I do that? > If I have 10 nodes then I want to create 10 rdds. Is that possible in > GraphX? This is possible: you can collect the elements to the driver, then create an RDD for each element. If you have so many elements that collect them to the driver is infeasible, there's probably an alternative solution that doesn't involve creating one RDD per element. Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Determination of number of RDDs
Hi, I have a graph and I want to create RDDs equal in number to the nodes in the graph. How can I do that? If I have 10 nodes then I want to create 10 rdds. Is that possible in GraphX? Like in C language we have array of pointers. Do we have array of RDDs in Spark. Can we create such an array and then parallelize it? Thank You
Re: Loading RDDs in a streaming fashion
This is a common use case and this is how Hadoop APIs for reading data work, they return an Iterator [Your Record] instead of reading every record in at once. On Dec 1, 2014 9:43 PM, "Andy Twigg" wrote: > You may be able to construct RDDs directly from an iterator - not sure > - you may have to subclass your own. > > On 1 December 2014 at 18:40, Keith Simmons wrote: > > Yep, that's definitely possible. It's one of the workarounds I was > > considering. I was just curious if there was a simpler (and perhaps more > > efficient) approach. > > > > Keith > > > > On Mon, Dec 1, 2014 at 6:28 PM, Andy Twigg wrote: > >> > >> Could you modify your function so that it streams through the files > record > >> by record and outputs them to hdfs, then read them all in as RDDs and > take > >> the union? That would only use bounded memory. > >> > >> On 1 December 2014 at 17:19, Keith Simmons wrote: > >>> > >>> Actually, I'm working with a binary format. The api allows reading > out a > >>> single record at a time, but I'm not sure how to get those records into > >>> spark (without reading everything into memory from a single file at > once). > >>> > >>> > >>> > >>> On Mon, Dec 1, 2014 at 5:07 PM, Andy Twigg > wrote: > >>>>> > >>>>> file => tranform file into a bunch of records > >>>> > >>>> > >>>> What does this function do exactly? Does it load the file locally? > >>>> Spark supports RDDs exceeding global RAM (cf the terasort example), > but > >>>> if your example just loads each file locally, then this may cause > problems. > >>>> Instead, you should load each file into an rdd with > context.textFile(), > >>>> flatmap that and union these rdds. > >>>> > >>>> also see > >>>> > >>>> > http://stackoverflow.com/questions/23397907/spark-context-textfile-load-multiple-files > >>>> > >>>> > >>>> On 1 December 2014 at 16:50, Keith Simmons wrote: > >>>>> > >>>>> This is a long shot, but... > >>>>> > >>>>> I'm trying to load a bunch of files spread out over hdfs into an RDD, > >>>>> and in most cases it works well, but for a few very large files, I > exceed > >>>>> available memory. My current workflow basically works like this: > >>>>> > >>>>> context.parallelize(fileNames).flatMap { file => > >>>>> tranform file into a bunch of records > >>>>> } > >>>>> > >>>>> I'm wondering if there are any APIs to somehow "flush" the records > of a > >>>>> big dataset so I don't have to load them all into memory at once. I > know > >>>>> this doesn't exist, but conceptually: > >>>>> > >>>>> context.parallelize(fileNames).streamMap { (file, stream) => > >>>>> for every 10K records write records to stream and flush > >>>>> } > >>>>> > >>>>> Keith > >>>> > >>>> > >>> > >> > > > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: Loading RDDs in a streaming fashion
You may be able to construct RDDs directly from an iterator - not sure - you may have to subclass your own. On 1 December 2014 at 18:40, Keith Simmons wrote: > Yep, that's definitely possible. It's one of the workarounds I was > considering. I was just curious if there was a simpler (and perhaps more > efficient) approach. > > Keith > > On Mon, Dec 1, 2014 at 6:28 PM, Andy Twigg wrote: >> >> Could you modify your function so that it streams through the files record >> by record and outputs them to hdfs, then read them all in as RDDs and take >> the union? That would only use bounded memory. >> >> On 1 December 2014 at 17:19, Keith Simmons wrote: >>> >>> Actually, I'm working with a binary format. The api allows reading out a >>> single record at a time, but I'm not sure how to get those records into >>> spark (without reading everything into memory from a single file at once). >>> >>> >>> >>> On Mon, Dec 1, 2014 at 5:07 PM, Andy Twigg wrote: >>>>> >>>>> file => tranform file into a bunch of records >>>> >>>> >>>> What does this function do exactly? Does it load the file locally? >>>> Spark supports RDDs exceeding global RAM (cf the terasort example), but >>>> if your example just loads each file locally, then this may cause problems. >>>> Instead, you should load each file into an rdd with context.textFile(), >>>> flatmap that and union these rdds. >>>> >>>> also see >>>> >>>> http://stackoverflow.com/questions/23397907/spark-context-textfile-load-multiple-files >>>> >>>> >>>> On 1 December 2014 at 16:50, Keith Simmons wrote: >>>>> >>>>> This is a long shot, but... >>>>> >>>>> I'm trying to load a bunch of files spread out over hdfs into an RDD, >>>>> and in most cases it works well, but for a few very large files, I exceed >>>>> available memory. My current workflow basically works like this: >>>>> >>>>> context.parallelize(fileNames).flatMap { file => >>>>> tranform file into a bunch of records >>>>> } >>>>> >>>>> I'm wondering if there are any APIs to somehow "flush" the records of a >>>>> big dataset so I don't have to load them all into memory at once. I know >>>>> this doesn't exist, but conceptually: >>>>> >>>>> context.parallelize(fileNames).streamMap { (file, stream) => >>>>> for every 10K records write records to stream and flush >>>>> } >>>>> >>>>> Keith >>>> >>>> >>> >> > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Loading RDDs in a streaming fashion
Yep, that's definitely possible. It's one of the workarounds I was considering. I was just curious if there was a simpler (and perhaps more efficient) approach. Keith On Mon, Dec 1, 2014 at 6:28 PM, Andy Twigg wrote: > Could you modify your function so that it streams through the files record > by record and outputs them to hdfs, then read them all in as RDDs and take > the union? That would only use bounded memory. > > On 1 December 2014 at 17:19, Keith Simmons wrote: > >> Actually, I'm working with a binary format. The api allows reading out a >> single record at a time, but I'm not sure how to get those records into >> spark (without reading everything into memory from a single file at once). >> >> >> >> On Mon, Dec 1, 2014 at 5:07 PM, Andy Twigg wrote: >> >>> file => tranform file into a bunch of records >>> >>> >>> What does this function do exactly? Does it load the file locally? >>> Spark supports RDDs exceeding global RAM (cf the terasort example), but >>> if your example just loads each file locally, then this may cause problems. >>> Instead, you should load each file into an rdd with context.textFile(), >>> flatmap that and union these rdds. >>> >>> also see >>> >>> http://stackoverflow.com/questions/23397907/spark-context-textfile-load-multiple-files >>> >>> >>> On 1 December 2014 at 16:50, Keith Simmons wrote: >>> >>>> This is a long shot, but... >>>> >>>> I'm trying to load a bunch of files spread out over hdfs into an RDD, >>>> and in most cases it works well, but for a few very large files, I exceed >>>> available memory. My current workflow basically works like this: >>>> >>>> context.parallelize(fileNames).flatMap { file => >>>> tranform file into a bunch of records >>>> } >>>> >>>> I'm wondering if there are any APIs to somehow "flush" the records of a >>>> big dataset so I don't have to load them all into memory at once. I know >>>> this doesn't exist, but conceptually: >>>> >>>> context.parallelize(fileNames).streamMap { (file, stream) => >>>> for every 10K records write records to stream and flush >>>> } >>>> >>>> Keith >>>> >>> >>> >> >
Re: Loading RDDs in a streaming fashion
Could you modify your function so that it streams through the files record by record and outputs them to hdfs, then read them all in as RDDs and take the union? That would only use bounded memory. On 1 December 2014 at 17:19, Keith Simmons wrote: > Actually, I'm working with a binary format. The api allows reading out a > single record at a time, but I'm not sure how to get those records into > spark (without reading everything into memory from a single file at once). > > > > On Mon, Dec 1, 2014 at 5:07 PM, Andy Twigg wrote: > >> file => tranform file into a bunch of records >> >> >> What does this function do exactly? Does it load the file locally? >> Spark supports RDDs exceeding global RAM (cf the terasort example), but >> if your example just loads each file locally, then this may cause problems. >> Instead, you should load each file into an rdd with context.textFile(), >> flatmap that and union these rdds. >> >> also see >> >> http://stackoverflow.com/questions/23397907/spark-context-textfile-load-multiple-files >> >> >> On 1 December 2014 at 16:50, Keith Simmons wrote: >> >>> This is a long shot, but... >>> >>> I'm trying to load a bunch of files spread out over hdfs into an RDD, >>> and in most cases it works well, but for a few very large files, I exceed >>> available memory. My current workflow basically works like this: >>> >>> context.parallelize(fileNames).flatMap { file => >>> tranform file into a bunch of records >>> } >>> >>> I'm wondering if there are any APIs to somehow "flush" the records of a >>> big dataset so I don't have to load them all into memory at once. I know >>> this doesn't exist, but conceptually: >>> >>> context.parallelize(fileNames).streamMap { (file, stream) => >>> for every 10K records write records to stream and flush >>> } >>> >>> Keith >>> >> >> >
Re: Loading RDDs in a streaming fashion
Actually, I'm working with a binary format. The api allows reading out a single record at a time, but I'm not sure how to get those records into spark (without reading everything into memory from a single file at once). On Mon, Dec 1, 2014 at 5:07 PM, Andy Twigg wrote: > file => tranform file into a bunch of records > > > What does this function do exactly? Does it load the file locally? > Spark supports RDDs exceeding global RAM (cf the terasort example), but if > your example just loads each file locally, then this may cause problems. > Instead, you should load each file into an rdd with context.textFile(), > flatmap that and union these rdds. > > also see > > http://stackoverflow.com/questions/23397907/spark-context-textfile-load-multiple-files > > > On 1 December 2014 at 16:50, Keith Simmons wrote: > >> This is a long shot, but... >> >> I'm trying to load a bunch of files spread out over hdfs into an RDD, and >> in most cases it works well, but for a few very large files, I exceed >> available memory. My current workflow basically works like this: >> >> context.parallelize(fileNames).flatMap { file => >> tranform file into a bunch of records >> } >> >> I'm wondering if there are any APIs to somehow "flush" the records of a >> big dataset so I don't have to load them all into memory at once. I know >> this doesn't exist, but conceptually: >> >> context.parallelize(fileNames).streamMap { (file, stream) => >> for every 10K records write records to stream and flush >> } >> >> Keith >> > >
Re: Loading RDDs in a streaming fashion
> > file => tranform file into a bunch of records What does this function do exactly? Does it load the file locally? Spark supports RDDs exceeding global RAM (cf the terasort example), but if your example just loads each file locally, then this may cause problems. Instead, you should load each file into an rdd with context.textFile(), flatmap that and union these rdds. also see http://stackoverflow.com/questions/23397907/spark-context-textfile-load-multiple-files On 1 December 2014 at 16:50, Keith Simmons wrote: > This is a long shot, but... > > I'm trying to load a bunch of files spread out over hdfs into an RDD, and > in most cases it works well, but for a few very large files, I exceed > available memory. My current workflow basically works like this: > > context.parallelize(fileNames).flatMap { file => > tranform file into a bunch of records > } > > I'm wondering if there are any APIs to somehow "flush" the records of a > big dataset so I don't have to load them all into memory at once. I know > this doesn't exist, but conceptually: > > context.parallelize(fileNames).streamMap { (file, stream) => > for every 10K records write records to stream and flush > } > > Keith >
Loading RDDs in a streaming fashion
This is a long shot, but... I'm trying to load a bunch of files spread out over hdfs into an RDD, and in most cases it works well, but for a few very large files, I exceed available memory. My current workflow basically works like this: context.parallelize(fileNames).flatMap { file => tranform file into a bunch of records } I'm wondering if there are any APIs to somehow "flush" the records of a big dataset so I don't have to load them all into memory at once. I know this doesn't exist, but conceptually: context.parallelize(fileNames).streamMap { (file, stream) => for every 10K records write records to stream and flush } Keith
Re: RDDs join problem: incorrect result
what do you mean by incorrect? could you please share some examples from both the RDD and resultant RDD also If you get any exception paste that too. it helps to debug where is the issue On 27 November 2014 at 17:07, liuboya [via Apache Spark User List] < ml-node+s1001560n19928...@n3.nabble.com> wrote: > Hi, >I ran into a problem when doing two RDDs join operation. For example, > RDDa: RDD[(String,String)] and RDDb:RDD[(String,Int)]. Then, the result > RDDc:[String,(String,Int)] = RDDa.join(RDDb). But I find the results in > RDDc are incorrect compared with RDDb. What's wrong in join? > > -- > If you reply to this email, your message will be added to the discussion > below: > > http://apache-spark-user-list.1001560.n3.nabble.com/RDDs-join-problem-incorrect-result-tp19928.html > To start a new topic under Apache Spark User List, email > ml-node+s1001560n1...@n3.nabble.com > To unsubscribe from Apache Spark User List, click here > <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=aG5haGFrQHd5bnlhcmRncm91cC5jb218MXwtMTgxOTE5MTkyOQ==> > . > NAML > <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> > -- Regards, Harihar Nahak BigData Developer Wynyard Email:hna...@wynyardgroup.com | Extn: 8019 ----- --Harihar -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDDs-join-problem-incorrect-result-tp19928p20056.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RDDs join problem: incorrect result
Hi, I ran into a problem when doing two RDDs join operation. For example, RDDa: RDD[(String,String)] and RDDb:RDD[(String,Int)]. Then, the result RDDc:[String,(String,Int)] = RDDa.join(RDDb). But I find the results in RDDc are incorrect compared with RDDb. What's wrong in join? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDDs-join-problem-incorrect-result-tp19928.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to join two RDDs with mutually exclusive keys
Thanks Daniel , Applied Join from PairedRDD val countByUsername = file1.join(file2) .map { case (id, (username, count)) => (id, username, count) } - --Harihar -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-join-two-RDDs-with-mutually-exclusive-keys-tp19417p19431.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to join two RDDs with mutually exclusive keys
Harihar, your question is the opposite of what was asked. In the future, please start a new thread for new questions. You want to do a join in your case. The join function does an inner join, which I think is what you want because you stated your IDs are common in both RDDs. For other cases you can look at leftOuterJoin, rightOuterJoin, and cogroup (alias groupWith). These are all on PairRDDFunctions (in Scala) [1] Alternatively, if one of your RDDs is small, you could collect it and broadcast the collection, using it in functions on the other RDD. But I don't think this will apply in your case because the number of records will be equal in each RDD. [2] [1] http://spark.apache.org/docs/latest/programming-guide.html#working-with-key-value-pairs [2] http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables On Thu, Nov 20, 2014 at 4:53 PM, Harihar Nahak wrote: > I've similar type of issue, want to join two different type of RDD in one > RDD > > file1.txt content (ID, counts) > val x : RDD[Long, Int] = sc.textFile("file1.txt").map( line => > line.split(",")).map(row => (row(0).toLong, row(1).toInt) > [(4407 ,40), > (2064, 38), > (7815 ,10), > (5736,17), > (8031,3)] > > Second RDD from : file2.txt contains (ID, name) > val y: RDD[(Long, String)]{where ID is common in both the RDDs} > [(4407 ,Jhon), > (2064, Maria), > (7815 ,Casto), > (5736,Ram), > (8031,XYZ)] > > and I'm expecting result should be like this : [(ID, Name, Count)] > [(4407 ,Jhon, 40), > (2064, Maria, 38), > (7815 ,Casto, 10), > (5736,Ram, 17), > (8031,XYZ, 3)] > > > Any help will really appreciate. Thanks > > > > > On 21 November 2014 09:18, dsiegmann [via Apache Spark User List] <[hidden > email] <http://user/SendEmail.jtp?type=node&node=19423&i=0>> wrote: > >> You want to use RDD.union (or SparkContext.union for many RDDs). These >> don't join on a key. Union doesn't really do anything itself, so it is low >> overhead. Note that the combined RDD will have all the partitions of the >> original RDDs, so you may want to coalesce after the union. >> >> val x = sc.parallelize(Seq( (1, 3), (2, 4) )) >> val y = sc.parallelize(Seq( (3, 5), (4, 7) )) >> val z = x.union(y) >> >> z.collect >> res0: Array[(Int, Int)] = Array((1,3), (2,4), (3,5), (4,7)) >> >> >> On Thu, Nov 20, 2014 at 3:06 PM, Blind Faith <[hidden email] >> <http://user/SendEmail.jtp?type=node&node=19419&i=0>> wrote: >> >>> Say I have two RDDs with the following values >>> >>> x = [(1, 3), (2, 4)] >>> >>> and >>> >>> y = [(3, 5), (4, 7)] >>> >>> and I want to have >>> >>> z = [(1, 3), (2, 4), (3, 5), (4, 7)] >>> >>> How can I achieve this. I know you can use outerJoin followed by map to >>> achieve this, but is there a more direct way for this. >>> >> >> >> >> -- >> Daniel Siegmann, Software Developer >> Velos >> Accelerating Machine Learning >> >> 54 W 40th St, New York, NY 10018 >> E: [hidden email] <http://user/SendEmail.jtp?type=node&node=19419&i=1> >> W: www.velos.io >> >> >> -- >> If you reply to this email, your message will be added to the >> discussion below: >> >> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-join-two-RDDs-with-mutually-exclusive-keys-tp19417p19419.html >> To start a new topic under Apache Spark User List, email [hidden email] >> <http://user/SendEmail.jtp?type=node&node=19423&i=1> >> To unsubscribe from Apache Spark User List, click here. >> NAML >> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >> > > > > -- > Regards, > Harihar Nahak > BigData Developer > Wynyard > [hidden email] <http://user/SendEmail.jtp?type=node&node=19423&i=2> | > Extn: 8019 > --Harihar > > -- > View this message in context: Re: How to join two RDDs with mutually > exclusive keys > <http://apache-spark-user-list.1001560.n3.nabble.com/How-to-join-two-RDDs-with-mutually-exclusive-keys-tp19417p19423.html> > Sent from the Apache Spark User List mailing list archive > <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com. > -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 54 W 40th St, New York, NY 10018 E: daniel.siegm...@velos.io W: www.velos.io
Re: How to join two RDDs with mutually exclusive keys
There is probably a better way to do it but I would register both as temp tables and then join them via SQL. BR, Daniel > On 20 בנוב׳ 2014, at 23:53, Harihar Nahak wrote: > > I've similar type of issue, want to join two different type of RDD in one RDD > > > file1.txt content (ID, counts) > val x : RDD[Long, Int] = sc.textFile("file1.txt").map( line => > line.split(",")).map(row => (row(0).toLong, row(1).toInt) > [(4407 ,40), > (2064, 38), > (7815 ,10), > (5736,17), > (8031,3)] > > Second RDD from : file2.txt contains (ID, name) > val y: RDD[(Long, String)]{where ID is common in both the RDDs} > [(4407 ,Jhon), > (2064, Maria), > (7815 ,Casto), > (5736,Ram), > (8031,XYZ)] > > and I'm expecting result should be like this : [(ID, Name, Count)] > [(4407 ,Jhon, 40), > (2064, Maria, 38), > (7815 ,Casto, 10), > (5736,Ram, 17), > (8031,XYZ, 3)] > > > Any help will really appreciate. Thanks > > > > >> On 21 November 2014 09:18, dsiegmann [via Apache Spark User List] <[hidden >> email]> wrote: >> You want to use RDD.union (or SparkContext.union for many RDDs). These don't >> join on a key. Union doesn't really do anything itself, so it is low >> overhead. Note that the combined RDD will have all the partitions of the >> original RDDs, so you may want to coalesce after the union. >> >> val x = sc.parallelize(Seq( (1, 3), (2, 4) )) >> val y = sc.parallelize(Seq( (3, 5), (4, 7) )) >> val z = x.union(y) >> >> z.collect >> res0: Array[(Int, Int)] = Array((1,3), (2,4), (3,5), (4,7)) >> >> >>> On Thu, Nov 20, 2014 at 3:06 PM, Blind Faith <[hidden email]> wrote: >>> Say I have two RDDs with the following values >>> >>> x = [(1, 3), (2, 4)] >>> and >>> >>> y = [(3, 5), (4, 7)] >>> and I want to have >>> >>> z = [(1, 3), (2, 4), (3, 5), (4, 7)] >>> How can I achieve this. I know you can use outerJoin followed by map to >>> achieve this, but is there a more direct way for this. >>> >> >> >> >> -- >> Daniel Siegmann, Software Developer >> Velos >> Accelerating Machine Learning >> >> 54 W 40th St, New York, NY 10018 >> E: [hidden email] W: www.velos.io >> >> >> If you reply to this email, your message will be added to the discussion >> below: >> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-join-two-RDDs-with-mutually-exclusive-keys-tp19417p19419.html >> To start a new topic under Apache Spark User List, email [hidden email] >> To unsubscribe from Apache Spark User List, click here. >> NAML > > > > -- > Regards, > Harihar Nahak > BigData Developer > Wynyard > [hidden email] | Extn: 8019 > --Harihar > > View this message in context: Re: How to join two RDDs with mutually > exclusive keys > Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How to join two RDDs with mutually exclusive keys
I've similar type of issue, want to join two different type of RDD in one RDD file1.txt content (ID, counts) val x : RDD[Long, Int] = sc.textFile("file1.txt").map( line => line.split(",")).map(row => (row(0).toLong, row(1).toInt) [(4407 ,40), (2064, 38), (7815 ,10), (5736,17), (8031,3)] Second RDD from : file2.txt contains (ID, name) val y: RDD[(Long, String)]{where ID is common in both the RDDs} [(4407 ,Jhon), (2064, Maria), (7815 ,Casto), (5736,Ram), (8031,XYZ)] and I'm expecting result should be like this : [(ID, Name, Count)] [(4407 ,Jhon, 40), (2064, Maria, 38), (7815 ,Casto, 10), (5736,Ram, 17), (8031,XYZ, 3)] Any help will really appreciate. Thanks On 21 November 2014 09:18, dsiegmann [via Apache Spark User List] < ml-node+s1001560n19419...@n3.nabble.com> wrote: > You want to use RDD.union (or SparkContext.union for many RDDs). These > don't join on a key. Union doesn't really do anything itself, so it is low > overhead. Note that the combined RDD will have all the partitions of the > original RDDs, so you may want to coalesce after the union. > > val x = sc.parallelize(Seq( (1, 3), (2, 4) )) > val y = sc.parallelize(Seq( (3, 5), (4, 7) )) > val z = x.union(y) > > z.collect > res0: Array[(Int, Int)] = Array((1,3), (2,4), (3,5), (4,7)) > > > On Thu, Nov 20, 2014 at 3:06 PM, Blind Faith <[hidden email] > <http://user/SendEmail.jtp?type=node&node=19419&i=0>> wrote: > >> Say I have two RDDs with the following values >> >> x = [(1, 3), (2, 4)] >> >> and >> >> y = [(3, 5), (4, 7)] >> >> and I want to have >> >> z = [(1, 3), (2, 4), (3, 5), (4, 7)] >> >> How can I achieve this. I know you can use outerJoin followed by map to >> achieve this, but is there a more direct way for this. >> > > > > -- > Daniel Siegmann, Software Developer > Velos > Accelerating Machine Learning > > 54 W 40th St, New York, NY 10018 > E: [hidden email] <http://user/SendEmail.jtp?type=node&node=19419&i=1> W: > www.velos.io > > > -- > If you reply to this email, your message will be added to the discussion > below: > > http://apache-spark-user-list.1001560.n3.nabble.com/How-to-join-two-RDDs-with-mutually-exclusive-keys-tp19417p19419.html > To start a new topic under Apache Spark User List, email > ml-node+s1001560n1...@n3.nabble.com > To unsubscribe from Apache Spark User List, click here > <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=aG5haGFrQHd5bnlhcmRncm91cC5jb218MXwtMTgxOTE5MTkyOQ==> > . > NAML > <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> > -- Regards, Harihar Nahak BigData Developer Wynyard Email:hna...@wynyardgroup.com | Extn: 8019 - --Harihar -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-join-two-RDDs-with-mutually-exclusive-keys-tp19417p19423.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How to join two RDDs with mutually exclusive keys
You want to use RDD.union (or SparkContext.union for many RDDs). These don't join on a key. Union doesn't really do anything itself, so it is low overhead. Note that the combined RDD will have all the partitions of the original RDDs, so you may want to coalesce after the union. val x = sc.parallelize(Seq( (1, 3), (2, 4) )) val y = sc.parallelize(Seq( (3, 5), (4, 7) )) val z = x.union(y) z.collect res0: Array[(Int, Int)] = Array((1,3), (2,4), (3,5), (4,7)) On Thu, Nov 20, 2014 at 3:06 PM, Blind Faith wrote: > Say I have two RDDs with the following values > > x = [(1, 3), (2, 4)] > > and > > y = [(3, 5), (4, 7)] > > and I want to have > > z = [(1, 3), (2, 4), (3, 5), (4, 7)] > > How can I achieve this. I know you can use outerJoin followed by map to > achieve this, but is there a more direct way for this. > -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 54 W 40th St, New York, NY 10018 E: daniel.siegm...@velos.io W: www.velos.io
How to join two RDDs with mutually exclusive keys
Say I have two RDDs with the following values x = [(1, 3), (2, 4)] and y = [(3, 5), (4, 7)] and I want to have z = [(1, 3), (2, 4), (3, 5), (4, 7)] How can I achieve this. I know you can use outerJoin followed by map to achieve this, but is there a more direct way for this.
Re: Transform RDD.groupBY result to multiple RDDs
What's your use case? You would not generally want to make so many small RDDs. On Nov 20, 2014 6:19 AM, "Dai, Kevin" wrote: > Hi, all > > > > Suppose I have a RDD of (K, V) tuple and I do groupBy on it with the key K. > > > > My question is how to make each groupBy resukt whick is (K, iterable[V]) a > RDD. > > > > BTW, can we transform it as a DStream and also each groupBY result is a > RDD in it? > > > > Best Regards, > > Kevin. >
Transform RDD.groupBY result to multiple RDDs
Hi, all Suppose I have a RDD of (K, V) tuple and I do groupBy on it with the key K. My question is how to make each groupBy resukt whick is (K, iterable[V]) a RDD. BTW, can we transform it as a DStream and also each groupBY result is a RDD in it? Best Regards, Kevin.
Re: Declaring multiple RDDs and efficiency concerns
This code executes on the driver, and an "RDD" here is really just a handle on all the distributed data out there. It's a local bookkeeping object. So, manipulation of these objects themselves in the local driver code has virtually no performance impact. These two versions would be about identical*. * maybe someone can point out a case where not maintaining the reference lets something get cleaned up earlier, but I'm not aware of this sort of effect On Fri, Nov 14, 2014 at 4:31 PM, Simone Franzini wrote: > Let's say I have to apply a complex sequence of operations to a certain RDD. > In order to make code more modular/readable, I would typically have > something like this: > > object myObject { > def main(args: Array[String]) { > val rdd1 = function1(myRdd) > val rdd2 = function2(rdd1) > val rdd3 = function3(rdd2) > } > > def function1(rdd: RDD) : RDD = { doSomething } > def function2(rdd: RDD) : RDD = { doSomethingElse } > def function3(rdd: RDD) : RDD = { doSomethingElseYet } > } > > So I am explicitly declaring vals for the intermediate steps. Does this end > up using more storage than if I just chained all of the operations and > declared only one val instead? > If yes, is there a better way to chain together the operations? > Ideally I would like to do something like: > > val rdd = function1.function2.function3 > > Is there a way I can write the signature of my functions to accomplish this? > Is this also an efficiency issue or just a stylistic one? > > Simone Franzini, PhD > > http://www.linkedin.com/in/simonefranzini - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org