flatMap and spilling of output to disk
Hi! I have been using spark a lot recently and it's been running really well and fast, but now when I increase the data size, it's starting to run into problems: I have an RDD in the form of (String, Iterable[String]) - the Iterable[String] was produced by a groupByKey() - and I perform a flatMap on it that outputs some form of cartesian product of the values per key: rdd.flatMap({case (key, values) = for(v1 - values; v2 - values) yield ((v1, v2), 1)}) So the runtime cost per RDD entry is O(n^2) where n is the number of values. This n can sometimes be 10,000 or even 100,000. That produces a lot of data, I am aware of that, but otherwise I wouldn't need a cluster, would I? :) For n=1000 this operation works quite well. But as soon as I allow n to be = 10,000 or higher, I start to get GC overhead limit exceeded exceptions. Configuration: - 7g executor memory - spark.shuffle.memoryFraction=0.5 - spark.storage.memoryFraction=0.1 I am not sure how the remaining memory for the actual JVM instance performing the flatMap is computed, but I would assume it to be something like (1-0.5-0.1)*7g = 2.8g Now I am wondering: Why should these 2.8g (or say even a few hundred MB) not suffice for spark to process this flatMap without too much GC overhead? If I assume a string to be 10 characters on average, therefore consuming about 60 bytes with overhead taken into account, then 10,000 of these values sum up to no more than ~600kb, and apart from that spark never has to keep anything else in memory. My question: When does spark start to spill RDD entries to disk, assuming that no RDD is to be persisted? Does it keep all output of the flatMap operation in memory until the entire flatMap is done? Or does it already spill every single yielded ((v1, v2), 1) entry out to disk if necessary? Thanks a lot! Johannes - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: flatMap and spilling of output to disk
You are rightly thinking that Spark should be able to just stream this massive collection of pairs you are creating, and never need to put it all in memory. That's true, but, your function actually creates a huge collection of pairs in memory before Spark ever touches it. This is going to materialize all pairs in a collection: for(v1 - values; v2 - values) yield ((v1, v2), 1) In fact, you only need to return a TraversableOnce to Spark. An Iterator is just fine, for example. Others probably have a better suggestion, but, I think you can try something along these lines instead: values.toIterator.flatMap(v1 = values.toIterator.map(v2 = ((v1, v2), 1))) That should mean you hold at most N tuples in memory (depends on how flatMap works I guess), not N^2. That's the problem and I think there is a solution along these lines, but I haven't tested it or thought about it for more than 2 minutes. After that you should allow other behaviors from Spark to help you. On Wed, Dec 10, 2014 at 12:13 PM, Johannes Simon johannes.si...@mail.de wrote: Hi! I have been using spark a lot recently and it's been running really well and fast, but now when I increase the data size, it's starting to run into problems: I have an RDD in the form of (String, Iterable[String]) - the Iterable[String] was produced by a groupByKey() - and I perform a flatMap on it that outputs some form of cartesian product of the values per key: rdd.flatMap({case (key, values) = for(v1 - values; v2 - values) yield ((v1, v2), 1)}) So the runtime cost per RDD entry is O(n^2) where n is the number of values. This n can sometimes be 10,000 or even 100,000. That produces a lot of data, I am aware of that, but otherwise I wouldn't need a cluster, would I? :) For n=1000 this operation works quite well. But as soon as I allow n to be = 10,000 or higher, I start to get GC overhead limit exceeded exceptions. Configuration: - 7g executor memory - spark.shuffle.memoryFraction=0.5 - spark.storage.memoryFraction=0.1 I am not sure how the remaining memory for the actual JVM instance performing the flatMap is computed, but I would assume it to be something like (1-0.5-0.1)*7g = 2.8g Now I am wondering: Why should these 2.8g (or say even a few hundred MB) not suffice for spark to process this flatMap without too much GC overhead? If I assume a string to be 10 characters on average, therefore consuming about 60 bytes with overhead taken into account, then 10,000 of these values sum up to no more than ~600kb, and apart from that spark never has to keep anything else in memory. My question: When does spark start to spill RDD entries to disk, assuming that no RDD is to be persisted? Does it keep all output of the flatMap operation in memory until the entire flatMap is done? Or does it already spill every single yielded ((v1, v2), 1) entry out to disk if necessary? Thanks a lot! Johannes - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: flatMap and spilling of output to disk
for(v1 - values; v2 - values) yield ((v1, v2), 1) will generate all data at once and return all of them to flatMap. To solve your problem, you should use for (v1 - values.iterator; v2 - values.iterator) yield ((v1, v2), 1) which will generate the data when it’s necessary. Best Regards, Shixiong Zhu 2014-12-10 20:13 GMT+08:00 Johannes Simon johannes.si...@mail.de: Hi! I have been using spark a lot recently and it's been running really well and fast, but now when I increase the data size, it's starting to run into problems: I have an RDD in the form of (String, Iterable[String]) - the Iterable[String] was produced by a groupByKey() - and I perform a flatMap on it that outputs some form of cartesian product of the values per key: rdd.flatMap({case (key, values) = for(v1 - values; v2 - values) yield ((v1, v2), 1)}) So the runtime cost per RDD entry is O(n^2) where n is the number of values. This n can sometimes be 10,000 or even 100,000. That produces a lot of data, I am aware of that, but otherwise I wouldn't need a cluster, would I? :) For n=1000 this operation works quite well. But as soon as I allow n to be = 10,000 or higher, I start to get GC overhead limit exceeded exceptions. Configuration: - 7g executor memory - spark.shuffle.memoryFraction=0.5 - spark.storage.memoryFraction=0.1 I am not sure how the remaining memory for the actual JVM instance performing the flatMap is computed, but I would assume it to be something like (1-0.5-0.1)*7g = 2.8g Now I am wondering: Why should these 2.8g (or say even a few hundred MB) not suffice for spark to process this flatMap without too much GC overhead? If I assume a string to be 10 characters on average, therefore consuming about 60 bytes with overhead taken into account, then 10,000 of these values sum up to no more than ~600kb, and apart from that spark never has to keep anything else in memory. My question: When does spark start to spill RDD entries to disk, assuming that no RDD is to be persisted? Does it keep all output of the flatMap operation in memory until the entire flatMap is done? Or does it already spill every single yielded ((v1, v2), 1) entry out to disk if necessary? Thanks a lot! Johannes - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: flatMap and spilling of output to disk
Hi! Using an iterator solved the problem! I've been chewing on this for days, so thanks a lot to both of you!! :) Since in an earlier version of my code, I used a self-join to perform the same thing, and ran into the same problems, I just looked at the implementation of PairRDDFunction.join (Spark v1.1.1): def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = { this.cogroup(other, partitioner).flatMapValues( pair = for (v - pair._1; w - pair._2) yield (v, w) ) } Is there a reason to not use an iterator here if possible? Pardon my lack of Scala knowledge.. This should in any case cause the same problems I had when the size of vs/ws gets too large. (Though that question is more of a dev ml question) Thanks! Johannes Am 10.12.2014 um 13:44 schrieb Shixiong Zhu zsxw...@gmail.com: for(v1 - values; v2 - values) yield ((v1, v2), 1) will generate all data at once and return all of them to flatMap. To solve your problem, you should use for (v1 - values.iterator; v2 - values.iterator) yield ((v1, v2), 1) which will generate the data when it’s necessary. Best Regards, Shixiong Zhu 2014-12-10 20:13 GMT+08:00 Johannes Simon johannes.si...@mail.de mailto:johannes.si...@mail.de: Hi! I have been using spark a lot recently and it's been running really well and fast, but now when I increase the data size, it's starting to run into problems: I have an RDD in the form of (String, Iterable[String]) - the Iterable[String] was produced by a groupByKey() - and I perform a flatMap on it that outputs some form of cartesian product of the values per key: rdd.flatMap({case (key, values) = for(v1 - values; v2 - values) yield ((v1, v2), 1)}) So the runtime cost per RDD entry is O(n^2) where n is the number of values. This n can sometimes be 10,000 or even 100,000. That produces a lot of data, I am aware of that, but otherwise I wouldn't need a cluster, would I? :) For n=1000 this operation works quite well. But as soon as I allow n to be = 10,000 or higher, I start to get GC overhead limit exceeded exceptions. Configuration: - 7g executor memory - spark.shuffle.memoryFraction=0.5 - spark.storage.memoryFraction=0.1 I am not sure how the remaining memory for the actual JVM instance performing the flatMap is computed, but I would assume it to be something like (1-0.5-0.1)*7g = 2.8g Now I am wondering: Why should these 2.8g (or say even a few hundred MB) not suffice for spark to process this flatMap without too much GC overhead? If I assume a string to be 10 characters on average, therefore consuming about 60 bytes with overhead taken into account, then 10,000 of these values sum up to no more than ~600kb, and apart from that spark never has to keep anything else in memory. My question: When does spark start to spill RDD entries to disk, assuming that no RDD is to be persisted? Does it keep all output of the flatMap operation in memory until the entire flatMap is done? Or does it already spill every single yielded ((v1, v2), 1) entry out to disk if necessary? Thanks a lot! Johannes - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org
Re: flatMap and spilling of output to disk
Good catch. `Join` should use `Iterator`, too. I open an JIRA here: https://issues.apache.org/jira/browse/SPARK-4824 Best Regards, Shixiong Zhu 2014-12-10 21:35 GMT+08:00 Johannes Simon johannes.si...@mail.de: Hi! Using an iterator solved the problem! I've been chewing on this for days, so thanks a lot to both of you!! :) Since in an earlier version of my code, I used a self-join to perform the same thing, and ran into the same problems, I just looked at the implementation of PairRDDFunction.join (Spark v1.1.1): def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = { this.cogroup(other, partitioner).flatMapValues( pair = for (v - pair._1; w - pair._2) yield (v, w) ) } Is there a reason to not use an iterator here if possible? Pardon my lack of Scala knowledge.. This should in any case cause the same problems I had when the size of vs/ws gets too large. (Though that question is more of a dev ml question) Thanks! Johannes Am 10.12.2014 um 13:44 schrieb Shixiong Zhu zsxw...@gmail.com: for(v1 - values; v2 - values) yield ((v1, v2), 1) will generate all data at once and return all of them to flatMap. To solve your problem, you should use for (v1 - values.iterator; v2 - values.iterator) yield ((v1, v2), 1) which will generate the data when it’s necessary. Best Regards, Shixiong Zhu 2014-12-10 20:13 GMT+08:00 Johannes Simon johannes.si...@mail.de: Hi! I have been using spark a lot recently and it's been running really well and fast, but now when I increase the data size, it's starting to run into problems: I have an RDD in the form of (String, Iterable[String]) - the Iterable[String] was produced by a groupByKey() - and I perform a flatMap on it that outputs some form of cartesian product of the values per key: rdd.flatMap({case (key, values) = for(v1 - values; v2 - values) yield ((v1, v2), 1)}) So the runtime cost per RDD entry is O(n^2) where n is the number of values. This n can sometimes be 10,000 or even 100,000. That produces a lot of data, I am aware of that, but otherwise I wouldn't need a cluster, would I? :) For n=1000 this operation works quite well. But as soon as I allow n to be = 10,000 or higher, I start to get GC overhead limit exceeded exceptions. Configuration: - 7g executor memory - spark.shuffle.memoryFraction=0.5 - spark.storage.memoryFraction=0.1 I am not sure how the remaining memory for the actual JVM instance performing the flatMap is computed, but I would assume it to be something like (1-0.5-0.1)*7g = 2.8g Now I am wondering: Why should these 2.8g (or say even a few hundred MB) not suffice for spark to process this flatMap without too much GC overhead? If I assume a string to be 10 characters on average, therefore consuming about 60 bytes with overhead taken into account, then 10,000 of these values sum up to no more than ~600kb, and apart from that spark never has to keep anything else in memory. My question: When does spark start to spill RDD entries to disk, assuming that no RDD is to be persisted? Does it keep all output of the flatMap operation in memory until the entire flatMap is done? Or does it already spill every single yielded ((v1, v2), 1) entry out to disk if necessary? Thanks a lot! Johannes - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org