Good catch. `Join` should use `Iterator`, too. I open an JIRA here:
https://issues.apache.org/jira/browse/SPARK-4824

Best Regards,
Shixiong Zhu

2014-12-10 21:35 GMT+08:00 Johannes Simon <johannes.si...@mail.de>:

> Hi!
>
> Using an iterator solved the problem! I've been chewing on this for days,
> so thanks a lot to both of you!! :)
>
> Since in an earlier version of my code, I used a self-join to perform the
> same thing, and ran into the same problems, I just looked at the
> implementation of PairRDDFunction.join (Spark v1.1.1):
>
> def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K,
> (V, W))] = {
>   this.cogroup(other, partitioner).flatMapValues( pair =>
>     for (v <- pair._1; w <- pair._2) yield (v, w)
>   )
> }
>
> Is there a reason to not use an iterator here if possible? Pardon my lack
> of Scala knowledge.. This should in any case cause the same problems I had
> when the size of vs/ws gets too large. (Though that question is more of a
> dev ml question)
>
> Thanks!
> Johannes
>
> Am 10.12.2014 um 13:44 schrieb Shixiong Zhu <zsxw...@gmail.com>:
>
> for(v1 <- values; v2 <- values) yield ((v1, v2), 1) will generate all
> data at once and return all of them to flatMap.
>
> To solve your problem, you should use for (v1 <- values.iterator; v2 <-
> values.iterator) yield ((v1, v2), 1) which will generate the data when
> it’s necessary.
> ​
>
> Best Regards,
> Shixiong Zhu
>
> 2014-12-10 20:13 GMT+08:00 Johannes Simon <johannes.si...@mail.de>:
>
>> Hi!
>>
>> I have been using spark a lot recently and it's been running really well
>> and fast, but now when I increase the data size, it's starting to run into
>> problems:
>> I have an RDD in the form of (String, Iterable[String]) - the
>> Iterable[String] was produced by a groupByKey() - and I perform a flatMap
>> on it that outputs some form of cartesian product of the values per key:
>>
>>
>> rdd.flatMap({case (key, values) => for(v1 <- values; v2 <- values) yield
>> ((v1, v2), 1)})
>>
>>
>> So the runtime cost per RDD entry is O(n^2) where n is the number of
>> values. This n can sometimes be 10,000 or even 100,000. That produces a lot
>> of data, I am aware of that, but otherwise I wouldn't need a cluster, would
>> I? :) For n<=1000 this operation works quite well. But as soon as I allow n
>> to be <= 10,000 or higher, I start to get "GC overhead limit exceeded"
>> exceptions.
>>
>> Configuration:
>> - 7g executor memory
>> - spark.shuffle.memoryFraction=0.5
>> - spark.storage.memoryFraction=0.1
>> I am not sure how the remaining memory for the actual JVM instance
>> performing the flatMap is computed, but I would assume it to be something
>> like (1-0.5-0.1)*7g = 2.8g
>>
>> Now I am wondering: Why should these 2.8g (or say even a few hundred MB)
>> not suffice for spark to process this flatMap without too much GC overhead?
>> If I assume a string to be 10 characters on average, therefore consuming
>> about 60 bytes with overhead taken into account, then 10,000 of these
>> values sum up to no more than ~600kb, and apart from that spark never has
>> to keep anything else in memory.
>>
>> My question: When does spark start to spill RDD entries to disk, assuming
>> that no RDD is to be persisted? Does it keep all output of the flatMap
>> operation in memory until the entire flatMap is done? Or does it already
>> spill every single yielded "((v1, v2), 1)" entry out to disk if necessary?
>>
>> Thanks a lot!
>> Johannes
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>

Reply via email to