[ 
https://issues.apache.org/jira/browse/SPARK-6551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jarno Seppanen updated SPARK-6551:
----------------------------------
    Description: 
Python RDD.aggregate method doesn't match its documentation w.r.t. seqOp 
mutating its first argument.

* the results are incorrect if seqOp mutates its first argument
* additionally, the zero value is modified if combOp mutates its first argument 
(this is slightly surprising, would be nice to document)

I'm aggregating the RDD into a nontrivial data structure, and it would be 
wasteful to copy the whole data structure into a new instance in every seqOp, 
so mutation is an important feature.

I'm seeing the following behavior:

{code}
def inc_mutate(counter, item):
    counter[0] += 1
    return counter

def inc_pure(counter, item):
    return [counter[0] + 1]

def merge_mutate(c1, c2):
    c1[0] += c2[0]
    return c1

def merge_pure(c1, c2):
    return [c1[0] + c2[0]]

# correct answer, when neither function mutates their arguments
init = [0]
sc.parallelize(range(10)).aggregate(init, inc_pure, merge_pure)
# [10]
init
# [0]

# incorrect answer if seqOp mutates its first argument
init = [0]
sc.parallelize(range(10)).aggregate(init, inc_mutate, merge_pure)
# [20] <- WRONG
init
# [0]

# zero value is modified if combOp mutates its first argument
init = [0]
sc.parallelize(range(10)).aggregate(init, inc_pure, merge_mutate)
# [10]
init
# [10]

# for completeness
init = [0]
sc.parallelize(range(10)).aggregate(init, inc_mutate, merge_mutate)
# [20]
init
# [20]
{code}

I'm running on an EMR cluster launched with:
{code}
aws emr create-cluster --name jarno-spark \
 --ami-version 3.5 \
 --instance-type c3.8xlarge \
 --instance-count 5 \
 --ec2-attributes KeyName=foo \
 --applications Name=Ganglia \
 --log-uri s3://foo/log \
 --bootstrap-actions 
Path=s3://support.elasticmapreduce/spark/install-spark,Args=[-g,-x,-l,ERROR]
{code}


  was:
Python RDD.aggregate method doesn't match its documentation w.r.t. seqOp or 
combOp mutating their first argument.

* the results are incorrect if seqOp mutates its first argument
* the zero value is modified if combOp mutates its first argument

I'm seeing the following behavior:

{code}
def inc_mutate(counter, item):
    counter[0] += 1
    return counter

def inc_pure(counter, item):
    return [counter[0] + 1]

def merge_mutate(c1, c2):
    c1[0] += c2[0]
    return c1

def merge_pure(c1, c2):
    return [c1[0] + c2[0]]

# correct answer, when neither function mutates their arguments
init = [0]
sc.parallelize(range(10)).aggregate(init, inc_pure, merge_pure)
# [10]
init
# [0]

# incorrect answer if seqOp mutates its first argument
init = [0]
sc.parallelize(range(10)).aggregate(init, inc_mutate, merge_pure)
# [20] <- WRONG
init
# [0]

# zero value is modified if combOp mutates its first argument
init = [0]
sc.parallelize(range(10)).aggregate(init, inc_pure, merge_mutate)
# [10]
init
# [10] <- funny behavior (though not documented)

# for completeness
init = [0]
sc.parallelize(range(10)).aggregate(init, inc_mutate, merge_mutate)
# [20]
init
# [20]
{code}

        Summary: Incorrect aggregate results if seqOp(...) mutates its first 
argument  (was: Incorrect aggregate results if op(...) mutates first argument)

> Incorrect aggregate results if seqOp(...) mutates its first argument
> --------------------------------------------------------------------
>
>                 Key: SPARK-6551
>                 URL: https://issues.apache.org/jira/browse/SPARK-6551
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 1.3.0
>         Environment: Amazon EMR, AMI version 3.5
>            Reporter: Jarno Seppanen
>
> Python RDD.aggregate method doesn't match its documentation w.r.t. seqOp 
> mutating its first argument.
> * the results are incorrect if seqOp mutates its first argument
> * additionally, the zero value is modified if combOp mutates its first 
> argument (this is slightly surprising, would be nice to document)
> I'm aggregating the RDD into a nontrivial data structure, and it would be 
> wasteful to copy the whole data structure into a new instance in every seqOp, 
> so mutation is an important feature.
> I'm seeing the following behavior:
> {code}
> def inc_mutate(counter, item):
>     counter[0] += 1
>     return counter
> def inc_pure(counter, item):
>     return [counter[0] + 1]
> def merge_mutate(c1, c2):
>     c1[0] += c2[0]
>     return c1
> def merge_pure(c1, c2):
>     return [c1[0] + c2[0]]
> # correct answer, when neither function mutates their arguments
> init = [0]
> sc.parallelize(range(10)).aggregate(init, inc_pure, merge_pure)
> # [10]
> init
> # [0]
> # incorrect answer if seqOp mutates its first argument
> init = [0]
> sc.parallelize(range(10)).aggregate(init, inc_mutate, merge_pure)
> # [20] <- WRONG
> init
> # [0]
> # zero value is modified if combOp mutates its first argument
> init = [0]
> sc.parallelize(range(10)).aggregate(init, inc_pure, merge_mutate)
> # [10]
> init
> # [10]
> # for completeness
> init = [0]
> sc.parallelize(range(10)).aggregate(init, inc_mutate, merge_mutate)
> # [20]
> init
> # [20]
> {code}
> I'm running on an EMR cluster launched with:
> {code}
> aws emr create-cluster --name jarno-spark \
>  --ami-version 3.5 \
>  --instance-type c3.8xlarge \
>  --instance-count 5 \
>  --ec2-attributes KeyName=foo \
>  --applications Name=Ganglia \
>  --log-uri s3://foo/log \
>  --bootstrap-actions 
> Path=s3://support.elasticmapreduce/spark/install-spark,Args=[-g,-x,-l,ERROR]
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to