Hi!

Using an iterator solved the problem! I've been chewing on this for days, so 
thanks a lot to both of you!! :)

Since in an earlier version of my code, I used a self-join to perform the same 
thing, and ran into the same problems, I just looked at the implementation of 
PairRDDFunction.join (Spark v1.1.1):

def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
  this.cogroup(other, partitioner).flatMapValues( pair =>
    for (v <- pair._1; w <- pair._2) yield (v, w)
  )
}

Is there a reason to not use an iterator here if possible? Pardon my lack of 
Scala knowledge.. This should in any case cause the same problems I had when 
the size of vs/ws gets too large. (Though that question is more of a dev ml 
question)

Thanks!
Johannes

> Am 10.12.2014 um 13:44 schrieb Shixiong Zhu <zsxw...@gmail.com>:
> 
> for(v1 <- values; v2 <- values) yield ((v1, v2), 1) will generate all data at 
> once and return all of them to flatMap.
> 
> To solve your problem, you should use for (v1 <- values.iterator; v2 <- 
> values.iterator) yield ((v1, v2), 1) which will generate the data when it’s 
> necessary.
> 
> 
> Best Regards,
> 
> Shixiong Zhu
> 
> 2014-12-10 20:13 GMT+08:00 Johannes Simon <johannes.si...@mail.de 
> <mailto:johannes.si...@mail.de>>:
> Hi!
> 
> I have been using spark a lot recently and it's been running really well and 
> fast, but now when I increase the data size, it's starting to run into 
> problems:
> I have an RDD in the form of (String, Iterable[String]) - the 
> Iterable[String] was produced by a groupByKey() - and I perform a flatMap on 
> it that outputs some form of cartesian product of the values per key:
> 
> 
> rdd.flatMap({case (key, values) => for(v1 <- values; v2 <- values) yield 
> ((v1, v2), 1)})
> 
> 
> So the runtime cost per RDD entry is O(n^2) where n is the number of values. 
> This n can sometimes be 10,000 or even 100,000. That produces a lot of data, 
> I am aware of that, but otherwise I wouldn't need a cluster, would I? :) For 
> n<=1000 this operation works quite well. But as soon as I allow n to be <= 
> 10,000 or higher, I start to get "GC overhead limit exceeded" exceptions.
> 
> Configuration:
> - 7g executor memory
> - spark.shuffle.memoryFraction=0.5
> - spark.storage.memoryFraction=0.1
> I am not sure how the remaining memory for the actual JVM instance performing 
> the flatMap is computed, but I would assume it to be something like 
> (1-0.5-0.1)*7g = 2.8g
> 
> Now I am wondering: Why should these 2.8g (or say even a few hundred MB) not 
> suffice for spark to process this flatMap without too much GC overhead? If I 
> assume a string to be 10 characters on average, therefore consuming about 60 
> bytes with overhead taken into account, then 10,000 of these values sum up to 
> no more than ~600kb, and apart from that spark never has to keep anything 
> else in memory.
> 
> My question: When does spark start to spill RDD entries to disk, assuming 
> that no RDD is to be persisted? Does it keep all output of the flatMap 
> operation in memory until the entire flatMap is done? Or does it already 
> spill every single yielded "((v1, v2), 1)" entry out to disk if necessary?
> 
> Thanks a lot!
> Johannes
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> For additional commands, e-mail: user-h...@spark.apache.org 
> <mailto:user-h...@spark.apache.org>
> 
> 

Reply via email to