flatMap and spilling of output to disk

2014-12-10 Thread Johannes Simon
Hi!

I have been using spark a lot recently and it's been running really well and 
fast, but now when I increase the data size, it's starting to run into problems:
I have an RDD in the form of (String, Iterable[String]) - the Iterable[String] 
was produced by a groupByKey() - and I perform a flatMap on it that outputs 
some form of cartesian product of the values per key:


rdd.flatMap({case (key, values) = for(v1 - values; v2 - values) yield ((v1, 
v2), 1)})


So the runtime cost per RDD entry is O(n^2) where n is the number of values. 
This n can sometimes be 10,000 or even 100,000. That produces a lot of data, I 
am aware of that, but otherwise I wouldn't need a cluster, would I? :) For 
n=1000 this operation works quite well. But as soon as I allow n to be = 
10,000 or higher, I start to get GC overhead limit exceeded exceptions.

Configuration:
- 7g executor memory
- spark.shuffle.memoryFraction=0.5
- spark.storage.memoryFraction=0.1
I am not sure how the remaining memory for the actual JVM instance performing 
the flatMap is computed, but I would assume it to be something like 
(1-0.5-0.1)*7g = 2.8g

Now I am wondering: Why should these 2.8g (or say even a few hundred MB) not 
suffice for spark to process this flatMap without too much GC overhead? If I 
assume a string to be 10 characters on average, therefore consuming about 60 
bytes with overhead taken into account, then 10,000 of these values sum up to 
no more than ~600kb, and apart from that spark never has to keep anything else 
in memory.

My question: When does spark start to spill RDD entries to disk, assuming that 
no RDD is to be persisted? Does it keep all output of the flatMap operation in 
memory until the entire flatMap is done? Or does it already spill every single 
yielded ((v1, v2), 1) entry out to disk if necessary?

Thanks a lot!
Johannes
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: flatMap and spilling of output to disk

2014-12-10 Thread Sean Owen
You are rightly thinking that Spark should be able to just stream
this massive collection of pairs you are creating, and never need to
put it all in memory. That's true, but, your function actually creates
a huge collection of pairs in memory before Spark ever touches it.

This is going to materialize all pairs in a collection:

for(v1 - values; v2 - values) yield ((v1, v2), 1)

In fact, you only need to return a TraversableOnce to Spark. An
Iterator is just fine, for example.

Others probably have a better suggestion, but, I think you can try
something along these lines instead:

values.toIterator.flatMap(v1 = values.toIterator.map(v2 = ((v1, v2), 1)))

That should mean you hold at most N tuples in memory (depends on how
flatMap works I guess), not N^2.

That's the problem and I think there is a solution along these lines,
but I haven't tested it or thought about it for more than 2 minutes.

After that you should allow other behaviors from Spark to help you.

On Wed, Dec 10, 2014 at 12:13 PM, Johannes Simon johannes.si...@mail.de wrote:
 Hi!

 I have been using spark a lot recently and it's been running really well and 
 fast, but now when I increase the data size, it's starting to run into 
 problems:
 I have an RDD in the form of (String, Iterable[String]) - the 
 Iterable[String] was produced by a groupByKey() - and I perform a flatMap on 
 it that outputs some form of cartesian product of the values per key:


 rdd.flatMap({case (key, values) = for(v1 - values; v2 - values) yield 
 ((v1, v2), 1)})


 So the runtime cost per RDD entry is O(n^2) where n is the number of values. 
 This n can sometimes be 10,000 or even 100,000. That produces a lot of data, 
 I am aware of that, but otherwise I wouldn't need a cluster, would I? :) For 
 n=1000 this operation works quite well. But as soon as I allow n to be = 
 10,000 or higher, I start to get GC overhead limit exceeded exceptions.

 Configuration:
 - 7g executor memory
 - spark.shuffle.memoryFraction=0.5
 - spark.storage.memoryFraction=0.1
 I am not sure how the remaining memory for the actual JVM instance performing 
 the flatMap is computed, but I would assume it to be something like 
 (1-0.5-0.1)*7g = 2.8g

 Now I am wondering: Why should these 2.8g (or say even a few hundred MB) not 
 suffice for spark to process this flatMap without too much GC overhead? If I 
 assume a string to be 10 characters on average, therefore consuming about 60 
 bytes with overhead taken into account, then 10,000 of these values sum up to 
 no more than ~600kb, and apart from that spark never has to keep anything 
 else in memory.

 My question: When does spark start to spill RDD entries to disk, assuming 
 that no RDD is to be persisted? Does it keep all output of the flatMap 
 operation in memory until the entire flatMap is done? Or does it already 
 spill every single yielded ((v1, v2), 1) entry out to disk if necessary?

 Thanks a lot!
 Johannes
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: flatMap and spilling of output to disk

2014-12-10 Thread Shixiong Zhu
for(v1 - values; v2 - values) yield ((v1, v2), 1) will generate all data
at once and return all of them to flatMap.

To solve your problem, you should use for (v1 - values.iterator; v2 -
values.iterator) yield ((v1, v2), 1) which will generate the data when it’s
necessary.
​

Best Regards,
Shixiong Zhu

2014-12-10 20:13 GMT+08:00 Johannes Simon johannes.si...@mail.de:

 Hi!

 I have been using spark a lot recently and it's been running really well
 and fast, but now when I increase the data size, it's starting to run into
 problems:
 I have an RDD in the form of (String, Iterable[String]) - the
 Iterable[String] was produced by a groupByKey() - and I perform a flatMap
 on it that outputs some form of cartesian product of the values per key:


 rdd.flatMap({case (key, values) = for(v1 - values; v2 - values) yield
 ((v1, v2), 1)})


 So the runtime cost per RDD entry is O(n^2) where n is the number of
 values. This n can sometimes be 10,000 or even 100,000. That produces a lot
 of data, I am aware of that, but otherwise I wouldn't need a cluster, would
 I? :) For n=1000 this operation works quite well. But as soon as I allow n
 to be = 10,000 or higher, I start to get GC overhead limit exceeded
 exceptions.

 Configuration:
 - 7g executor memory
 - spark.shuffle.memoryFraction=0.5
 - spark.storage.memoryFraction=0.1
 I am not sure how the remaining memory for the actual JVM instance
 performing the flatMap is computed, but I would assume it to be something
 like (1-0.5-0.1)*7g = 2.8g

 Now I am wondering: Why should these 2.8g (or say even a few hundred MB)
 not suffice for spark to process this flatMap without too much GC overhead?
 If I assume a string to be 10 characters on average, therefore consuming
 about 60 bytes with overhead taken into account, then 10,000 of these
 values sum up to no more than ~600kb, and apart from that spark never has
 to keep anything else in memory.

 My question: When does spark start to spill RDD entries to disk, assuming
 that no RDD is to be persisted? Does it keep all output of the flatMap
 operation in memory until the entire flatMap is done? Or does it already
 spill every single yielded ((v1, v2), 1) entry out to disk if necessary?

 Thanks a lot!
 Johannes
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: flatMap and spilling of output to disk

2014-12-10 Thread Johannes Simon
Hi!

Using an iterator solved the problem! I've been chewing on this for days, so 
thanks a lot to both of you!! :)

Since in an earlier version of my code, I used a self-join to perform the same 
thing, and ran into the same problems, I just looked at the implementation of 
PairRDDFunction.join (Spark v1.1.1):

def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
  this.cogroup(other, partitioner).flatMapValues( pair =
for (v - pair._1; w - pair._2) yield (v, w)
  )
}

Is there a reason to not use an iterator here if possible? Pardon my lack of 
Scala knowledge.. This should in any case cause the same problems I had when 
the size of vs/ws gets too large. (Though that question is more of a dev ml 
question)

Thanks!
Johannes

 Am 10.12.2014 um 13:44 schrieb Shixiong Zhu zsxw...@gmail.com:
 
 for(v1 - values; v2 - values) yield ((v1, v2), 1) will generate all data at 
 once and return all of them to flatMap.
 
 To solve your problem, you should use for (v1 - values.iterator; v2 - 
 values.iterator) yield ((v1, v2), 1) which will generate the data when it’s 
 necessary.
 
 
 Best Regards,
 
 Shixiong Zhu
 
 2014-12-10 20:13 GMT+08:00 Johannes Simon johannes.si...@mail.de 
 mailto:johannes.si...@mail.de:
 Hi!
 
 I have been using spark a lot recently and it's been running really well and 
 fast, but now when I increase the data size, it's starting to run into 
 problems:
 I have an RDD in the form of (String, Iterable[String]) - the 
 Iterable[String] was produced by a groupByKey() - and I perform a flatMap on 
 it that outputs some form of cartesian product of the values per key:
 
 
 rdd.flatMap({case (key, values) = for(v1 - values; v2 - values) yield 
 ((v1, v2), 1)})
 
 
 So the runtime cost per RDD entry is O(n^2) where n is the number of values. 
 This n can sometimes be 10,000 or even 100,000. That produces a lot of data, 
 I am aware of that, but otherwise I wouldn't need a cluster, would I? :) For 
 n=1000 this operation works quite well. But as soon as I allow n to be = 
 10,000 or higher, I start to get GC overhead limit exceeded exceptions.
 
 Configuration:
 - 7g executor memory
 - spark.shuffle.memoryFraction=0.5
 - spark.storage.memoryFraction=0.1
 I am not sure how the remaining memory for the actual JVM instance performing 
 the flatMap is computed, but I would assume it to be something like 
 (1-0.5-0.1)*7g = 2.8g
 
 Now I am wondering: Why should these 2.8g (or say even a few hundred MB) not 
 suffice for spark to process this flatMap without too much GC overhead? If I 
 assume a string to be 10 characters on average, therefore consuming about 60 
 bytes with overhead taken into account, then 10,000 of these values sum up to 
 no more than ~600kb, and apart from that spark never has to keep anything 
 else in memory.
 
 My question: When does spark start to spill RDD entries to disk, assuming 
 that no RDD is to be persisted? Does it keep all output of the flatMap 
 operation in memory until the entire flatMap is done? Or does it already 
 spill every single yielded ((v1, v2), 1) entry out to disk if necessary?
 
 Thanks a lot!
 Johannes
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
 mailto:user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org 
 mailto:user-h...@spark.apache.org
 
 



Re: flatMap and spilling of output to disk

2014-12-10 Thread Shixiong Zhu
Good catch. `Join` should use `Iterator`, too. I open an JIRA here:
https://issues.apache.org/jira/browse/SPARK-4824

Best Regards,
Shixiong Zhu

2014-12-10 21:35 GMT+08:00 Johannes Simon johannes.si...@mail.de:

 Hi!

 Using an iterator solved the problem! I've been chewing on this for days,
 so thanks a lot to both of you!! :)

 Since in an earlier version of my code, I used a self-join to perform the
 same thing, and ran into the same problems, I just looked at the
 implementation of PairRDDFunction.join (Spark v1.1.1):

 def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K,
 (V, W))] = {
   this.cogroup(other, partitioner).flatMapValues( pair =
 for (v - pair._1; w - pair._2) yield (v, w)
   )
 }

 Is there a reason to not use an iterator here if possible? Pardon my lack
 of Scala knowledge.. This should in any case cause the same problems I had
 when the size of vs/ws gets too large. (Though that question is more of a
 dev ml question)

 Thanks!
 Johannes

 Am 10.12.2014 um 13:44 schrieb Shixiong Zhu zsxw...@gmail.com:

 for(v1 - values; v2 - values) yield ((v1, v2), 1) will generate all
 data at once and return all of them to flatMap.

 To solve your problem, you should use for (v1 - values.iterator; v2 -
 values.iterator) yield ((v1, v2), 1) which will generate the data when
 it’s necessary.
 ​

 Best Regards,
 Shixiong Zhu

 2014-12-10 20:13 GMT+08:00 Johannes Simon johannes.si...@mail.de:

 Hi!

 I have been using spark a lot recently and it's been running really well
 and fast, but now when I increase the data size, it's starting to run into
 problems:
 I have an RDD in the form of (String, Iterable[String]) - the
 Iterable[String] was produced by a groupByKey() - and I perform a flatMap
 on it that outputs some form of cartesian product of the values per key:


 rdd.flatMap({case (key, values) = for(v1 - values; v2 - values) yield
 ((v1, v2), 1)})


 So the runtime cost per RDD entry is O(n^2) where n is the number of
 values. This n can sometimes be 10,000 or even 100,000. That produces a lot
 of data, I am aware of that, but otherwise I wouldn't need a cluster, would
 I? :) For n=1000 this operation works quite well. But as soon as I allow n
 to be = 10,000 or higher, I start to get GC overhead limit exceeded
 exceptions.

 Configuration:
 - 7g executor memory
 - spark.shuffle.memoryFraction=0.5
 - spark.storage.memoryFraction=0.1
 I am not sure how the remaining memory for the actual JVM instance
 performing the flatMap is computed, but I would assume it to be something
 like (1-0.5-0.1)*7g = 2.8g

 Now I am wondering: Why should these 2.8g (or say even a few hundred MB)
 not suffice for spark to process this flatMap without too much GC overhead?
 If I assume a string to be 10 characters on average, therefore consuming
 about 60 bytes with overhead taken into account, then 10,000 of these
 values sum up to no more than ~600kb, and apart from that spark never has
 to keep anything else in memory.

 My question: When does spark start to spill RDD entries to disk, assuming
 that no RDD is to be persisted? Does it keep all output of the flatMap
 operation in memory until the entire flatMap is done? Or does it already
 spill every single yielded ((v1, v2), 1) entry out to disk if necessary?

 Thanks a lot!
 Johannes
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org