[ https://issues.apache.org/jira/browse/SPARK-4818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Josh Rosen resolved SPARK-4818. ------------------------------- Resolution: Fixed Fix Version/s: 1.2.1 1.3.0 1.1.2 Issue resolved by pull request 3671 [https://github.com/apache/spark/pull/3671] > Join operation should use iterator/lazy evaluation > -------------------------------------------------- > > Key: SPARK-4818 > URL: https://issues.apache.org/jira/browse/SPARK-4818 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 1.1.1 > Reporter: Johannes Simon > Fix For: 1.1.2, 1.3.0, 1.2.1 > > > The current implementation of the join operation does not use an iterator > (i.e. lazy evaluation), causing it to explicitly evaluate the co-grouped > values. In big data applications, these value collections can be very large. > This causes the *cartesian product of all co-grouped values* for a specific > key of both RDDs to be kept in memory during the flatMapValues operation, > resulting in an *O(size(pair._1)*size(pair._2))* memory consumption instead > of *O(1)*. Very large value collections will therefore cause "GC overhead > limit exceeded" exceptions and fail the task, or at least slow down execution > dramatically. > {code:title=PairRDDFunctions.scala|borderStyle=solid} > //... > def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = > { > this.cogroup(other, partitioner).flatMapValues( pair => > for (v <- pair._1; w <- pair._2) yield (v, w) > ) > } > //... > {code} > Since cogroup returns an Iterable instance of an Array, the join > implementation could be changed to the following, which uses lazy evaluation > instead, and has almost no memory overhead: > {code:title=PairRDDFunctions.scala|borderStyle=solid} > //... > def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = > { > this.cogroup(other, partitioner).flatMapValues( pair => > for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w) > ) > } > //... > {code} > Alternatively, if the current implementation is intentionally not using lazy > evaluation for some reason, there could be a *lazyJoin()* method next to the > original join implementation that utilizes lazy evaluation. This of course > applies to other join operations as well. > Thanks! :) -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org