Hi! Using an iterator solved the problem! I've been chewing on this for days, so thanks a lot to both of you!! :)
Since in an earlier version of my code, I used a self-join to perform the same thing, and ran into the same problems, I just looked at the implementation of PairRDDFunction.join (Spark v1.1.1): def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = { this.cogroup(other, partitioner).flatMapValues( pair => for (v <- pair._1; w <- pair._2) yield (v, w) ) } Is there a reason to not use an iterator here if possible? Pardon my lack of Scala knowledge.. This should in any case cause the same problems I had when the size of vs/ws gets too large. (Though that question is more of a dev ml question) Thanks! Johannes > Am 10.12.2014 um 13:44 schrieb Shixiong Zhu <zsxw...@gmail.com>: > > for(v1 <- values; v2 <- values) yield ((v1, v2), 1) will generate all data at > once and return all of them to flatMap. > > To solve your problem, you should use for (v1 <- values.iterator; v2 <- > values.iterator) yield ((v1, v2), 1) which will generate the data when it’s > necessary. > > > Best Regards, > > Shixiong Zhu > > 2014-12-10 20:13 GMT+08:00 Johannes Simon <johannes.si...@mail.de > <mailto:johannes.si...@mail.de>>: > Hi! > > I have been using spark a lot recently and it's been running really well and > fast, but now when I increase the data size, it's starting to run into > problems: > I have an RDD in the form of (String, Iterable[String]) - the > Iterable[String] was produced by a groupByKey() - and I perform a flatMap on > it that outputs some form of cartesian product of the values per key: > > > rdd.flatMap({case (key, values) => for(v1 <- values; v2 <- values) yield > ((v1, v2), 1)}) > > > So the runtime cost per RDD entry is O(n^2) where n is the number of values. > This n can sometimes be 10,000 or even 100,000. That produces a lot of data, > I am aware of that, but otherwise I wouldn't need a cluster, would I? :) For > n<=1000 this operation works quite well. But as soon as I allow n to be <= > 10,000 or higher, I start to get "GC overhead limit exceeded" exceptions. > > Configuration: > - 7g executor memory > - spark.shuffle.memoryFraction=0.5 > - spark.storage.memoryFraction=0.1 > I am not sure how the remaining memory for the actual JVM instance performing > the flatMap is computed, but I would assume it to be something like > (1-0.5-0.1)*7g = 2.8g > > Now I am wondering: Why should these 2.8g (or say even a few hundred MB) not > suffice for spark to process this flatMap without too much GC overhead? If I > assume a string to be 10 characters on average, therefore consuming about 60 > bytes with overhead taken into account, then 10,000 of these values sum up to > no more than ~600kb, and apart from that spark never has to keep anything > else in memory. > > My question: When does spark start to spill RDD entries to disk, assuming > that no RDD is to be persisted? Does it keep all output of the flatMap > operation in memory until the entire flatMap is done? Or does it already > spill every single yielded "((v1, v2), 1)" entry out to disk if necessary? > > Thanks a lot! > Johannes > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > <mailto:user-unsubscr...@spark.apache.org> > For additional commands, e-mail: user-h...@spark.apache.org > <mailto:user-h...@spark.apache.org> > >