[ 
https://issues.apache.org/jira/browse/SPARK-6551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14381807#comment-14381807
 ] 

Sean Owen commented on SPARK-6551:
----------------------------------

FWIW an equivalent example works as expected in Scala, and the scaladoc says 
you should be able to mutate the first argument. 

{code}
val data = sc.parallelize(0 until 10)

def seqOp(a: Array[Int], b: Int) = {
  a(0) += 1
  a
}

def combOp(a: Array[Int], b: Array[Int]) = {
  a(0) += b(0)
  a
}

data.aggregate(new Array[Int](1))(seqOp, combOp)

10
{code}

> Incorrect aggregate results if seqOp(...) mutates its first argument
> --------------------------------------------------------------------
>
>                 Key: SPARK-6551
>                 URL: https://issues.apache.org/jira/browse/SPARK-6551
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 1.3.0
>         Environment: Amazon EMR, AMI version 3.5
>            Reporter: Jarno Seppanen
>
> Python RDD.aggregate method doesn't match its documentation w.r.t. seqOp 
> mutating its first argument.
> * the results are incorrect if seqOp mutates its first argument
> * additionally, the zero value is modified if combOp mutates its first 
> argument (this is slightly surprising, would be nice to document)
> I'm aggregating the RDD into a nontrivial data structure, and it would be 
> wasteful to copy the whole data structure into a new instance in every seqOp, 
> so mutation is an important feature.
> I'm seeing the following behavior:
> {code}
> def inc_mutate(counter, item):
>     counter[0] += 1
>     return counter
> def inc_pure(counter, item):
>     return [counter[0] + 1]
> def merge_mutate(c1, c2):
>     c1[0] += c2[0]
>     return c1
> def merge_pure(c1, c2):
>     return [c1[0] + c2[0]]
> # correct answer, when neither function mutates their arguments
> init = [0]
> sc.parallelize(range(10)).aggregate(init, inc_pure, merge_pure)
> # [10]
> init
> # [0]
> # incorrect answer if seqOp mutates its first argument
> init = [0]
> sc.parallelize(range(10)).aggregate(init, inc_mutate, merge_pure)
> # [20] <- WRONG
> init
> # [0]
> # zero value is modified if combOp mutates its first argument
> init = [0]
> sc.parallelize(range(10)).aggregate(init, inc_pure, merge_mutate)
> # [10]
> init
> # [10]
> # for completeness
> init = [0]
> sc.parallelize(range(10)).aggregate(init, inc_mutate, merge_mutate)
> # [20]
> init
> # [20]
> {code}
> I'm running on an EMR cluster launched with:
> {code}
> aws emr create-cluster --name jarno-spark \
>  --ami-version 3.5 \
>  --instance-type c3.8xlarge \
>  --instance-count 5 \
>  --ec2-attributes KeyName=foo \
>  --applications Name=Ganglia \
>  --log-uri s3://foo/log \
>  --bootstrap-actions 
> Path=s3://support.elasticmapreduce/spark/install-spark,Args=[-g,-x,-l,ERROR]
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to