You are rightly thinking that Spark should be able to just "stream"
this massive collection of pairs you are creating, and never need to
put it all in memory. That's true, but, your function actually creates
a huge collection of pairs in memory before Spark ever touches it.

This is going to materialize all pairs in a collection:

for(v1 <- values; v2 <- values) yield ((v1, v2), 1)

In fact, you only need to return a TraversableOnce to Spark. An
Iterator is just fine, for example.

Others probably have a better suggestion, but, I think you can try
something along these lines instead:

values.toIterator.flatMap(v1 => values.toIterator.map(v2 => ((v1, v2), 1)))

That should mean you hold at most N tuples in memory (depends on how
flatMap works I guess), not N^2.

That's the problem and I think there is a solution along these lines,
but I haven't tested it or thought about it for more than 2 minutes.

After that you should allow other behaviors from Spark to help you.

On Wed, Dec 10, 2014 at 12:13 PM, Johannes Simon <johannes.si...@mail.de> wrote:
> Hi!
>
> I have been using spark a lot recently and it's been running really well and 
> fast, but now when I increase the data size, it's starting to run into 
> problems:
> I have an RDD in the form of (String, Iterable[String]) - the 
> Iterable[String] was produced by a groupByKey() - and I perform a flatMap on 
> it that outputs some form of cartesian product of the values per key:
>
>
> rdd.flatMap({case (key, values) => for(v1 <- values; v2 <- values) yield 
> ((v1, v2), 1)})
>
>
> So the runtime cost per RDD entry is O(n^2) where n is the number of values. 
> This n can sometimes be 10,000 or even 100,000. That produces a lot of data, 
> I am aware of that, but otherwise I wouldn't need a cluster, would I? :) For 
> n<=1000 this operation works quite well. But as soon as I allow n to be <= 
> 10,000 or higher, I start to get "GC overhead limit exceeded" exceptions.
>
> Configuration:
> - 7g executor memory
> - spark.shuffle.memoryFraction=0.5
> - spark.storage.memoryFraction=0.1
> I am not sure how the remaining memory for the actual JVM instance performing 
> the flatMap is computed, but I would assume it to be something like 
> (1-0.5-0.1)*7g = 2.8g
>
> Now I am wondering: Why should these 2.8g (or say even a few hundred MB) not 
> suffice for spark to process this flatMap without too much GC overhead? If I 
> assume a string to be 10 characters on average, therefore consuming about 60 
> bytes with overhead taken into account, then 10,000 of these values sum up to 
> no more than ~600kb, and apart from that spark never has to keep anything 
> else in memory.
>
> My question: When does spark start to spill RDD entries to disk, assuming 
> that no RDD is to be persisted? Does it keep all output of the flatMap 
> operation in memory until the entire flatMap is done? Or does it already 
> spill every single yielded "((v1, v2), 1)" entry out to disk if necessary?
>
> Thanks a lot!
> Johannes
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to