[ 
https://issues.apache.org/jira/browse/SPARK-10685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josh Rosen updated SPARK-10685:
-------------------------------
    Assignee: Reynold Xin

> Misaligned data with RDD.zip and DataFrame.withColumn after repartition
> -----------------------------------------------------------------------
>
>                 Key: SPARK-10685
>                 URL: https://issues.apache.org/jira/browse/SPARK-10685
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, SQL
>    Affects Versions: 1.3.0, 1.4.1, 1.5.0
>         Environment: - OSX 10.10.4, java 1.7.0_51, hadoop 2.6.0-cdh5.4.5
> - Ubuntu 12.04, java 1.7.0_80, hadoop 2.6.0-cdh5.4.5
>            Reporter: Dan Brown
>            Assignee: Reynold Xin
>            Priority: Blocker
>             Fix For: 1.6.0, 1.5.1
>
>
> Here's a weird behavior where {{RDD.zip}} or {{DataFrame.withColumn}} after a 
> {{repartition}} produces "misaligned" data, meaning different column values 
> in the same row aren't matched, as if a zip shuffled the collections before 
> zipping them. It's difficult to reproduce because it's nondeterministic, 
> doesn't occur in local mode, and requires ≥2 workers (≥3 in one case). I was 
> able to repro it using pyspark 1.3.0 (cdh5.4.5), 1.4.1 (bin-without-hadoop), 
> and 1.5.0 (bin-without-hadoop).
> Here's the most similar issue I was able to find. It appears to not have been 
> repro'd and then closed optimistically, and it smells like it could have been 
> the same underlying cause that was never fixed:
> - https://issues.apache.org/jira/browse/SPARK-9131
> Also, this {{DataFrame.zip}} issue is related in spirit, since we were trying 
> to build it ourselves when we ran into this problem. Let me put in my vote 
> for reopening the issue and supporting {{DataFrame.zip}} in the standard lib.
> - https://issues.apache.org/jira/browse/SPARK-7460
> h3. Brief repro
> Fail: withColumn(udf) after DataFrame.repartition
> {code}
> df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000))
> df = df.repartition(100)
> df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a))
> [r for r in df.collect() if r.a != r.b][:3] # Should be []
> {code}
> Sample outputs (nondeterministic):
> {code}
> [Row(a=39, b=639), Row(a=139, b=739), Row(a=239, b=839)]
> [Row(a=639, b=39), Row(a=739, b=139), Row(a=839, b=239)]
> []
> [Row(a=641, b=41), Row(a=741, b=141), Row(a=841, b=241)]
> [Row(a=641, b=1343), Row(a=741, b=1443), Row(a=841, b=1543)]
> [Row(a=639, b=39), Row(a=739, b=139), Row(a=839, b=239)]
> {code}
> Fail: RDD.zip after DataFrame.repartition
> {code}
> df  = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000))
> df  = df.repartition(100)
> rdd = df.rdd.zip(df.map(lambda r: Row(b=r.a))).map(lambda (x,y): Row(a=x.a, 
> b=y.b))
> [r for r in rdd.collect() if r.a != r.b][:3] # Should be []
> {code}
> Sample outputs (nondeterministic):
> {code}
> []
> [Row(a=50, b=6947), Row(a=150, b=7047), Row(a=250, b=7147)]
> []
> []
> [Row(a=44, b=644), Row(a=144, b=744), Row(a=244, b=844)]
> []
> {code}
> Test setup:
> - local\[8]: {{MASTER=local\[8]}}
> - dist\[N]: 1 driver + 1 master + N workers
> {code}
> "Fail" tests pass?  cluster mode  spark version
> ----------------------------------------------------
> yes                 local[8]      1.3.0-cdh5.4.5
> no                  dist[4]       1.3.0-cdh5.4.5
> yes                 local[8]      1.4.1
> yes                 dist[1]       1.4.1
> no                  dist[2]       1.4.1
> no                  dist[4]       1.4.1
> yes                 local[8]      1.5.0
> yes                 dist[1]       1.5.0
> no                  dist[2]       1.5.0
> no                  dist[4]       1.5.0
> {code}
> h3. Detailed repro
> Start `pyspark` and run these imports:
> {code}
> from pyspark.sql import Row
> from pyspark.sql.functions import udf
> from pyspark.sql.types import IntegerType, StructType, StructField
> {code}
> Fail: withColumn(udf) after DataFrame.repartition
> {code}
> df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000))
> df = df.repartition(100)
> df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a))
> len([r for r in df.collect() if r.a != r.b]) # Should be 0
> {code}
> Ok: withColumn(udf) after DataFrame.repartition(100) after 1 starting 
> partition
> {code}
> df = sqlCtx.createDataFrame(sc.parallelize((Row(a=a) for a in xrange(10000)), 
> numSlices=1))
> df = df.repartition(100)
> df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a))
> len([r for r in df.collect() if r.a != r.b]) # Should be 0
> {code}
> Fail: withColumn(udf) after DataFrame.repartition(100) after 100 starting 
> partitions
> {code}
> df = sqlCtx.createDataFrame(sc.parallelize((Row(a=a) for a in xrange(10000)), 
> numSlices=100))
> df = df.repartition(100)
> df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a))
> len([r for r in df.collect() if r.a != r.b]) # Should be 0
> {code}
> Fail: withColumn(udf) after DataFrame.repartition(1) after 100 starting 
> partitions
> {code}
> df = sqlCtx.createDataFrame(sc.parallelize((Row(a=a) for a in xrange(10000)), 
> numSlices=100))
> df = df.repartition(1)
> df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a))
> len([r for r in df.collect() if r.a != r.b]) # Should be 0
> {code}
> Ok: withColumn(udf) after DataFrame.coalesce(10) after 100 starting partitions
> {code}
> df = sqlCtx.createDataFrame(sc.parallelize((Row(a=a) for a in xrange(10000)), 
> numSlices=100))
> df = df.coalesce(10)
> df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a))
> len([r for r in df.collect() if r.a != r.b]) # Should be 0
> {code}
> Ok: withColumn without udf
> {code}
> df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000))
> df = df.repartition(100)
> df = df.withColumn('b', df.a)
> len([r for r in df.collect() if r.a != r.b]) # Should be 0
> {code}
> Ok: createDataFrame(RDD.map) instead of withColumn(udf)
> {code}
> df  = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000))
> df  = df.repartition(100)
> rdd = df.map(lambda r: Row(a=r.a, b=r.a))
> df  = sqlCtx.createDataFrame(rdd, StructType(df.schema.fields + 
> [StructField('b', IntegerType())]))
> len([r for r in df.collect() if r.a != r.b]) # Should be 0
> {code}
> Fail: createDataFrame(RDD.zip) instead of withColumn(udf)
> {code}
> df  = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000))
> df  = df.repartition(100)
> rdd = df.rdd.zip(df.map(lambda r: Row(b=r.a))).map(lambda (x,y): Row(a=x.a, 
> b=y.b))
> df  = sqlCtx.createDataFrame(rdd, StructType(df.schema.fields + 
> [StructField('b', IntegerType())]))
> len([r for r in df.collect() if r.a != r.b]) # Should be 0
> {code}
> Fail: RDD.zip after DataFrame.repartition
> {code}
> df  = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000))
> df  = df.repartition(100)
> rdd = df.rdd.zip(df.map(lambda r: Row(b=r.a))).map(lambda (x,y): Row(a=x.a, 
> b=y.b))
> len([d for d in rdd.collect() if d.a != d.b]) # Should be 0
> {code}
> Fail: RDD.zip after RDD.repartition after 100 starting partitions
> - Failure requires ≥3 workers (whether dist or pseudo-dist)
> {code}
> rdd = sc.parallelize((Row(a=a) for a in xrange(10000)), numSlices=100)
> rdd = rdd.repartition(100)
> rdd = rdd.zip(rdd.map(lambda a: a)).map(lambda (a,b): Row(a=a,b=b))
> len([d for d in rdd.collect() if d.a != d.b]) # Should be 0
> {code}
> Ok: RDD.zip after RDD.repartition after 1 starting partition
> {code}
> rdd = sc.parallelize((Row(a=a) for a in xrange(10000)), numSlices=1)
> rdd = rdd.repartition(100)
> rdd = rdd.zip(rdd.map(lambda a: a)).map(lambda (a,b): Row(a=a,b=b))
> len([d for d in rdd.collect() if d.a != d.b]) # Should be 0
> {code}
> Test setup:
> - local\[8]: {{MASTER=local\[8]}}
> - pseudo-dist\[N]: 1 driver + 1 master + N workers; master and workers all on 
> same OS
> - dist\[N]: 1 driver + 1 master + N workers; master and workers all on 
> separate OS's
> - Spark 1.3.0-cdh5.4.5 with dist\[4] didn't trip any of the {{withColumn}} 
> failures, but did trip the {{zip}} failures
> - {{-}} indicates a configuration I didn't try
> {code}
> "Ok" tests pass?  "Fail" tests pass?        platform  cluster mode    spark 
> version
> ----------------------------------------------------------------
> yes               yes                       ubuntu    local[8]        
> 1.3.0-cdh5.4.5
> -                 -                         ubuntu    pseudo-dist[1]  
> 1.3.0-cdh5.4.5
> -                 -                         ubuntu    pseudo-dist[2]  
> 1.3.0-cdh5.4.5
> yes               no[zip], yes[withColumn]  ubuntu    dist[4]         
> 1.3.0-cdh5.4.5
> yes               yes                       osx       local[8]        1.4.1
> yes               yes                       ubuntu    local[8]        1.4.1
> yes               yes                       osx       pseudo-dist[1]  1.4.1
> -                 -                         ubuntu    pseudo-dist[1]  1.4.1
> yes               no                        osx       pseudo-dist[2]  1.4.1
> -                 -                         ubuntu    pseudo-dist[2]  1.4.1
> -                 -                         osx       dist[4]         1.4.1
> yes               no                        ubuntu    dist[4]         1.4.1
> yes               yes                       osx       local[8]        1.5.0
> yes               yes                       ubuntu    local[8]        1.5.0
> yes               yes                       osx       pseudo-dist[1]  1.5.0
> yes               yes                       ubuntu    pseudo-dist[1]  1.5.0
> yes               no                        osx       pseudo-dist[2]  1.5.0
> yes               no                        ubuntu    pseudo-dist[2]  1.5.0
> -                 -                         osx       dist[4]         1.5.0
> yes               no                        ubuntu    dist[4]         1.5.0
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to