[ https://issues.apache.org/jira/browse/SPARK-6551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14381807#comment-14381807 ]
Sean Owen commented on SPARK-6551: ---------------------------------- FWIW an equivalent example works as expected in Scala, and the scaladoc says you should be able to mutate the first argument. {code} val data = sc.parallelize(0 until 10) def seqOp(a: Array[Int], b: Int) = { a(0) += 1 a } def combOp(a: Array[Int], b: Array[Int]) = { a(0) += b(0) a } data.aggregate(new Array[Int](1))(seqOp, combOp) 10 {code} > Incorrect aggregate results if seqOp(...) mutates its first argument > -------------------------------------------------------------------- > > Key: SPARK-6551 > URL: https://issues.apache.org/jira/browse/SPARK-6551 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 1.3.0 > Environment: Amazon EMR, AMI version 3.5 > Reporter: Jarno Seppanen > > Python RDD.aggregate method doesn't match its documentation w.r.t. seqOp > mutating its first argument. > * the results are incorrect if seqOp mutates its first argument > * additionally, the zero value is modified if combOp mutates its first > argument (this is slightly surprising, would be nice to document) > I'm aggregating the RDD into a nontrivial data structure, and it would be > wasteful to copy the whole data structure into a new instance in every seqOp, > so mutation is an important feature. > I'm seeing the following behavior: > {code} > def inc_mutate(counter, item): > counter[0] += 1 > return counter > def inc_pure(counter, item): > return [counter[0] + 1] > def merge_mutate(c1, c2): > c1[0] += c2[0] > return c1 > def merge_pure(c1, c2): > return [c1[0] + c2[0]] > # correct answer, when neither function mutates their arguments > init = [0] > sc.parallelize(range(10)).aggregate(init, inc_pure, merge_pure) > # [10] > init > # [0] > # incorrect answer if seqOp mutates its first argument > init = [0] > sc.parallelize(range(10)).aggregate(init, inc_mutate, merge_pure) > # [20] <- WRONG > init > # [0] > # zero value is modified if combOp mutates its first argument > init = [0] > sc.parallelize(range(10)).aggregate(init, inc_pure, merge_mutate) > # [10] > init > # [10] > # for completeness > init = [0] > sc.parallelize(range(10)).aggregate(init, inc_mutate, merge_mutate) > # [20] > init > # [20] > {code} > I'm running on an EMR cluster launched with: > {code} > aws emr create-cluster --name jarno-spark \ > --ami-version 3.5 \ > --instance-type c3.8xlarge \ > --instance-count 5 \ > --ec2-attributes KeyName=foo \ > --applications Name=Ganglia \ > --log-uri s3://foo/log \ > --bootstrap-actions > Path=s3://support.elasticmapreduce/spark/install-spark,Args=[-g,-x,-l,ERROR] > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org