[ https://issues.apache.org/jira/browse/SPARK-6416?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean Owen resolved SPARK-6416. ------------------------------ Resolution: Fixed Assignee: Sean Owen Fix Version/s: 1.5.0 Target Version/s: (was: 2.0.0) Hm! I should have tried that myself. I think that's a good argument that at least it's not inconsistent. The behavior is documented by an earlier change so calling this resolved, retroactively. > RDD.fold() requires the operator to be commutative > -------------------------------------------------- > > Key: SPARK-6416 > URL: https://issues.apache.org/jira/browse/SPARK-6416 > Project: Spark > Issue Type: Bug > Components: Documentation, Spark Core > Affects Versions: 1.4.0 > Reporter: Josh Rosen > Assignee: Sean Owen > Priority: Critical > Fix For: 1.5.0 > > > Spark's {{RDD.fold}} operation has some confusing behaviors when a > non-commutative reduce function is used. > Here's an example, which was originally reported on StackOverflow > (https://stackoverflow.com/questions/29150202/pyspark-fold-method-output): > {code} > sc.parallelize([1,25,8,4,2]).fold(0,lambda a,b:a+1 ) > 8 > {code} > To understand what's going on here, let's look at the definition of Spark's > `fold` operation. > I'm going to show the Python version of the code, but the Scala version > exhibits the exact same behavior (you can also [browse the source on > GitHub|https://github.com/apache/spark/blob/8cb23a1f9a3ed08e57865bcb6cc1cc7902881073/python/pyspark/rdd.py#L780]: > {code} > def fold(self, zeroValue, op): > """ > Aggregate the elements of each partition, and then the results for all > the partitions, using a given associative function and a neutral "zero > value." > The function C{op(t1, t2)} is allowed to modify C{t1} and return it > as its result value to avoid object allocation; however, it should not > modify C{t2}. > >>> from operator import add > >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add) > 15 > """ > def func(iterator): > acc = zeroValue > for obj in iterator: > acc = op(obj, acc) > yield acc > vals = self.mapPartitions(func).collect() > return reduce(op, vals, zeroValue) > {code} > (For comparison, see the [Scala implementation of > `RDD.fold`|https://github.com/apache/spark/blob/8cb23a1f9a3ed08e57865bcb6cc1cc7902881073/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L943]). > Spark's `fold` operates by first folding each partition and then folding the > results. The problem is that an empty partition gets folded down to the zero > element, so the final driver-side fold ends up folding one value for _every_ > partition rather than one value for each _non-empty_ partition. This means > that the result of `fold` is sensitive to the number of partitions: > {code} > >>> sc.parallelize([1,25,8,4,2], 100).fold(0,lambda a,b:a+1 ) > 100 > >>> sc.parallelize([1,25,8,4,2], 50).fold(0,lambda a,b:a+1 ) > 50 > >>> sc.parallelize([1,25,8,4,2], 1).fold(0,lambda a,b:a+1 ) > 1 > {code} > In this last case, what's happening is that the single partition is being > folded down to the correct value, then that value is folded with the > zero-value at the driver to yield 1. > I think the underlying problem here is that our fold() operation implicitly > requires the operator to be commutative in addition to associative, but this > isn't documented anywhere. Due to ordering non-determinism elsewhere in > Spark, such as SPARK-5750, I don't think there's an easy way to fix this. > Therefore, I think we should update the documentation and examples to clarify > this requirement and explain that our fold acts more like a reduce with a > default value than the type of ordering-sensitive fold() that users may > expect in functional languages. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org