[ https://issues.apache.org/jira/browse/SPARK-6551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jarno Seppanen updated SPARK-6551: ---------------------------------- Description: Python RDD.aggregate method doesn't match its documentation w.r.t. seqOp mutating its first argument. * the results are incorrect if seqOp mutates its first argument * additionally, the zero value is modified if combOp mutates its first argument (this is slightly surprising, would be nice to document) I'm aggregating the RDD into a nontrivial data structure, and it would be wasteful to copy the whole data structure into a new instance in every seqOp, so mutation is an important feature. I'm seeing the following behavior: {code} def inc_mutate(counter, item): counter[0] += 1 return counter def inc_pure(counter, item): return [counter[0] + 1] def merge_mutate(c1, c2): c1[0] += c2[0] return c1 def merge_pure(c1, c2): return [c1[0] + c2[0]] # correct answer, when neither function mutates their arguments init = [0] sc.parallelize(range(10)).aggregate(init, inc_pure, merge_pure) # [10] init # [0] # incorrect answer if seqOp mutates its first argument init = [0] sc.parallelize(range(10)).aggregate(init, inc_mutate, merge_pure) # [20] <- WRONG init # [0] # zero value is modified if combOp mutates its first argument init = [0] sc.parallelize(range(10)).aggregate(init, inc_pure, merge_mutate) # [10] init # [10] # for completeness init = [0] sc.parallelize(range(10)).aggregate(init, inc_mutate, merge_mutate) # [20] init # [20] {code} I'm running on an EMR cluster launched with: {code} aws emr create-cluster --name jarno-spark \ --ami-version 3.5 \ --instance-type c3.8xlarge \ --instance-count 5 \ --ec2-attributes KeyName=foo \ --applications Name=Ganglia \ --log-uri s3://foo/log \ --bootstrap-actions Path=s3://support.elasticmapreduce/spark/install-spark,Args=[-g,-x,-l,ERROR] {code} was: Python RDD.aggregate method doesn't match its documentation w.r.t. seqOp or combOp mutating their first argument. * the results are incorrect if seqOp mutates its first argument * the zero value is modified if combOp mutates its first argument I'm seeing the following behavior: {code} def inc_mutate(counter, item): counter[0] += 1 return counter def inc_pure(counter, item): return [counter[0] + 1] def merge_mutate(c1, c2): c1[0] += c2[0] return c1 def merge_pure(c1, c2): return [c1[0] + c2[0]] # correct answer, when neither function mutates their arguments init = [0] sc.parallelize(range(10)).aggregate(init, inc_pure, merge_pure) # [10] init # [0] # incorrect answer if seqOp mutates its first argument init = [0] sc.parallelize(range(10)).aggregate(init, inc_mutate, merge_pure) # [20] <- WRONG init # [0] # zero value is modified if combOp mutates its first argument init = [0] sc.parallelize(range(10)).aggregate(init, inc_pure, merge_mutate) # [10] init # [10] <- funny behavior (though not documented) # for completeness init = [0] sc.parallelize(range(10)).aggregate(init, inc_mutate, merge_mutate) # [20] init # [20] {code} Summary: Incorrect aggregate results if seqOp(...) mutates its first argument (was: Incorrect aggregate results if op(...) mutates first argument) > Incorrect aggregate results if seqOp(...) mutates its first argument > -------------------------------------------------------------------- > > Key: SPARK-6551 > URL: https://issues.apache.org/jira/browse/SPARK-6551 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 1.3.0 > Environment: Amazon EMR, AMI version 3.5 > Reporter: Jarno Seppanen > > Python RDD.aggregate method doesn't match its documentation w.r.t. seqOp > mutating its first argument. > * the results are incorrect if seqOp mutates its first argument > * additionally, the zero value is modified if combOp mutates its first > argument (this is slightly surprising, would be nice to document) > I'm aggregating the RDD into a nontrivial data structure, and it would be > wasteful to copy the whole data structure into a new instance in every seqOp, > so mutation is an important feature. > I'm seeing the following behavior: > {code} > def inc_mutate(counter, item): > counter[0] += 1 > return counter > def inc_pure(counter, item): > return [counter[0] + 1] > def merge_mutate(c1, c2): > c1[0] += c2[0] > return c1 > def merge_pure(c1, c2): > return [c1[0] + c2[0]] > # correct answer, when neither function mutates their arguments > init = [0] > sc.parallelize(range(10)).aggregate(init, inc_pure, merge_pure) > # [10] > init > # [0] > # incorrect answer if seqOp mutates its first argument > init = [0] > sc.parallelize(range(10)).aggregate(init, inc_mutate, merge_pure) > # [20] <- WRONG > init > # [0] > # zero value is modified if combOp mutates its first argument > init = [0] > sc.parallelize(range(10)).aggregate(init, inc_pure, merge_mutate) > # [10] > init > # [10] > # for completeness > init = [0] > sc.parallelize(range(10)).aggregate(init, inc_mutate, merge_mutate) > # [20] > init > # [20] > {code} > I'm running on an EMR cluster launched with: > {code} > aws emr create-cluster --name jarno-spark \ > --ami-version 3.5 \ > --instance-type c3.8xlarge \ > --instance-count 5 \ > --ec2-attributes KeyName=foo \ > --applications Name=Ganglia \ > --log-uri s3://foo/log \ > --bootstrap-actions > Path=s3://support.elasticmapreduce/spark/install-spark,Args=[-g,-x,-l,ERROR] > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org