Repository: spark Updated Branches: refs/heads/master 9b18009b8 -> 456c11f15
[SPARK-5440][pyspark] Add toLocalIterator to pyspark rdd Since Java and Scala both have access to iterate over partitions via the "toLocalIterator" function, python should also have that same ability. Author: Michael Nazario <[email protected]> Closes #4237 from mnazario/feature/toLocalIterator and squashes the following commits: 1c58526 [Michael Nazario] Fix documentation off by one error 0cdc8f8 [Michael Nazario] Add toLocalIterator to PySpark Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/456c11f1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/456c11f1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/456c11f1 Branch: refs/heads/master Commit: 456c11f15aec809044d8bdbdcce0ae05533fb44b Parents: 9b18009 Author: Michael Nazario <[email protected]> Authored: Wed Jan 28 12:47:12 2015 -0800 Committer: Josh Rosen <[email protected]> Committed: Wed Jan 28 12:47:12 2015 -0800 ---------------------------------------------------------------------- python/pyspark/rdd.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/456c11f1/python/pyspark/rdd.py ---------------------------------------------------------------------- diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index efd2f35..014c0aa 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2059,6 +2059,20 @@ class RDD(object): hashRDD = self.map(lambda x: portable_hash(x) & 0xFFFFFFFF) return hashRDD._to_java_object_rdd().countApproxDistinct(relativeSD) + def toLocalIterator(self): + """ + Return an iterator that contains all of the elements in this RDD. + The iterator will consume as much memory as the largest partition in this RDD. + >>> rdd = sc.parallelize(range(10)) + >>> [x for x in rdd.toLocalIterator()] + [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] + """ + partitions = xrange(self.getNumPartitions()) + for partition in partitions: + rows = self.context.runJob(self, lambda x: x, [partition]) + for row in rows: + yield row + class PipelinedRDD(RDD): --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
