[ https://issues.apache.org/jira/browse/SPARK-10476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Simeon Simeonov updated SPARK-10476: ------------------------------------ Summary: Add common RDD operations on standard Scala collections (was: Add common RDD API methods to standard Scala collections) > Add common RDD operations on standard Scala collections > ------------------------------------------------------- > > Key: SPARK-10476 > URL: https://issues.apache.org/jira/browse/SPARK-10476 > Project: Spark > Issue Type: New Feature > Components: Spark Core > Affects Versions: 1.4.1 > Reporter: Simeon Simeonov > Priority: Minor > Labels: core, mapPartitions, rdd > > A common pattern in Spark development is to look for opportunities to > leverage data locality using mechanisms such as {{mapPartitions}}. Often this > happens when an existing set of RDD transformations is refactored to improve > performance. At that point, significant code refactoring may be required > because the input is {{Iterator\[T]}} as opposed to an RDD. The most common > examples we've encountered so far involve the {{*ByKey}} methods, {{sample}} > and {{takeSample}}. We have also observed cases where, due to changes in the > structure of data use of {{mapPartitions}} is no longer possible and the code > has to be converted to use the RDD API. > If data manipulation through the RDD API could be applied to the standard > Scala data structures then refactoring Spark data pipelines would become > faster and less bug-prone. Also, and this is no small benefit, the > thoughtfulness and experience of the Spark community could spread to the > broader Scala community. > There are multiple approaches to solving this problem, including but not > limited to creating a set of {{Local*RDD}} classes and/or adding implicit > conversions. > Here is a simple example meant to be short as opposed to complete or > performance-optimized: > {code} > implicit class LocalRDD[T](it: Iterator[T]) extends Iterable[T] { > def this(collection: Iterable[T]) = this(collection.toIterator) > def iterator = it > } > implicit class LocalPairRDD[K, V](it: Iterator[(K, V)]) extends Iterable[(K, > V)] { > def this(collection: Iterable[(K, V)]) = this(collection.toIterator) > def iterator = it > def groupByKey() = new LocalPairRDD[K, Iterable[V]]( > groupBy(_._1).map { case (k, valuePairs) => (k, valuePairs.map(_._2)) } > ) > } > sc. > parallelize(Array((1, 10), (2, 10), (1, 20))). > repartition(1). > mapPartitions(data => data.groupByKey().toIterator). > take(2) > // Array[(Int, Iterable[Int])] = Array((2,List(10)), (1,List(10, 20))) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org