[ https://issues.apache.org/jira/browse/SPARK-10685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Josh Rosen updated SPARK-10685: ------------------------------- Assignee: Reynold Xin > Misaligned data with RDD.zip and DataFrame.withColumn after repartition > ----------------------------------------------------------------------- > > Key: SPARK-10685 > URL: https://issues.apache.org/jira/browse/SPARK-10685 > Project: Spark > Issue Type: Bug > Components: PySpark, SQL > Affects Versions: 1.3.0, 1.4.1, 1.5.0 > Environment: - OSX 10.10.4, java 1.7.0_51, hadoop 2.6.0-cdh5.4.5 > - Ubuntu 12.04, java 1.7.0_80, hadoop 2.6.0-cdh5.4.5 > Reporter: Dan Brown > Assignee: Reynold Xin > Priority: Blocker > Fix For: 1.6.0, 1.5.1 > > > Here's a weird behavior where {{RDD.zip}} or {{DataFrame.withColumn}} after a > {{repartition}} produces "misaligned" data, meaning different column values > in the same row aren't matched, as if a zip shuffled the collections before > zipping them. It's difficult to reproduce because it's nondeterministic, > doesn't occur in local mode, and requires ≥2 workers (≥3 in one case). I was > able to repro it using pyspark 1.3.0 (cdh5.4.5), 1.4.1 (bin-without-hadoop), > and 1.5.0 (bin-without-hadoop). > Here's the most similar issue I was able to find. It appears to not have been > repro'd and then closed optimistically, and it smells like it could have been > the same underlying cause that was never fixed: > - https://issues.apache.org/jira/browse/SPARK-9131 > Also, this {{DataFrame.zip}} issue is related in spirit, since we were trying > to build it ourselves when we ran into this problem. Let me put in my vote > for reopening the issue and supporting {{DataFrame.zip}} in the standard lib. > - https://issues.apache.org/jira/browse/SPARK-7460 > h3. Brief repro > Fail: withColumn(udf) after DataFrame.repartition > {code} > df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000)) > df = df.repartition(100) > df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a)) > [r for r in df.collect() if r.a != r.b][:3] # Should be [] > {code} > Sample outputs (nondeterministic): > {code} > [Row(a=39, b=639), Row(a=139, b=739), Row(a=239, b=839)] > [Row(a=639, b=39), Row(a=739, b=139), Row(a=839, b=239)] > [] > [Row(a=641, b=41), Row(a=741, b=141), Row(a=841, b=241)] > [Row(a=641, b=1343), Row(a=741, b=1443), Row(a=841, b=1543)] > [Row(a=639, b=39), Row(a=739, b=139), Row(a=839, b=239)] > {code} > Fail: RDD.zip after DataFrame.repartition > {code} > df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000)) > df = df.repartition(100) > rdd = df.rdd.zip(df.map(lambda r: Row(b=r.a))).map(lambda (x,y): Row(a=x.a, > b=y.b)) > [r for r in rdd.collect() if r.a != r.b][:3] # Should be [] > {code} > Sample outputs (nondeterministic): > {code} > [] > [Row(a=50, b=6947), Row(a=150, b=7047), Row(a=250, b=7147)] > [] > [] > [Row(a=44, b=644), Row(a=144, b=744), Row(a=244, b=844)] > [] > {code} > Test setup: > - local\[8]: {{MASTER=local\[8]}} > - dist\[N]: 1 driver + 1 master + N workers > {code} > "Fail" tests pass? cluster mode spark version > ---------------------------------------------------- > yes local[8] 1.3.0-cdh5.4.5 > no dist[4] 1.3.0-cdh5.4.5 > yes local[8] 1.4.1 > yes dist[1] 1.4.1 > no dist[2] 1.4.1 > no dist[4] 1.4.1 > yes local[8] 1.5.0 > yes dist[1] 1.5.0 > no dist[2] 1.5.0 > no dist[4] 1.5.0 > {code} > h3. Detailed repro > Start `pyspark` and run these imports: > {code} > from pyspark.sql import Row > from pyspark.sql.functions import udf > from pyspark.sql.types import IntegerType, StructType, StructField > {code} > Fail: withColumn(udf) after DataFrame.repartition > {code} > df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000)) > df = df.repartition(100) > df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a)) > len([r for r in df.collect() if r.a != r.b]) # Should be 0 > {code} > Ok: withColumn(udf) after DataFrame.repartition(100) after 1 starting > partition > {code} > df = sqlCtx.createDataFrame(sc.parallelize((Row(a=a) for a in xrange(10000)), > numSlices=1)) > df = df.repartition(100) > df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a)) > len([r for r in df.collect() if r.a != r.b]) # Should be 0 > {code} > Fail: withColumn(udf) after DataFrame.repartition(100) after 100 starting > partitions > {code} > df = sqlCtx.createDataFrame(sc.parallelize((Row(a=a) for a in xrange(10000)), > numSlices=100)) > df = df.repartition(100) > df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a)) > len([r for r in df.collect() if r.a != r.b]) # Should be 0 > {code} > Fail: withColumn(udf) after DataFrame.repartition(1) after 100 starting > partitions > {code} > df = sqlCtx.createDataFrame(sc.parallelize((Row(a=a) for a in xrange(10000)), > numSlices=100)) > df = df.repartition(1) > df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a)) > len([r for r in df.collect() if r.a != r.b]) # Should be 0 > {code} > Ok: withColumn(udf) after DataFrame.coalesce(10) after 100 starting partitions > {code} > df = sqlCtx.createDataFrame(sc.parallelize((Row(a=a) for a in xrange(10000)), > numSlices=100)) > df = df.coalesce(10) > df = df.withColumn('b', udf(lambda r: r, IntegerType())(df.a)) > len([r for r in df.collect() if r.a != r.b]) # Should be 0 > {code} > Ok: withColumn without udf > {code} > df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000)) > df = df.repartition(100) > df = df.withColumn('b', df.a) > len([r for r in df.collect() if r.a != r.b]) # Should be 0 > {code} > Ok: createDataFrame(RDD.map) instead of withColumn(udf) > {code} > df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000)) > df = df.repartition(100) > rdd = df.map(lambda r: Row(a=r.a, b=r.a)) > df = sqlCtx.createDataFrame(rdd, StructType(df.schema.fields + > [StructField('b', IntegerType())])) > len([r for r in df.collect() if r.a != r.b]) # Should be 0 > {code} > Fail: createDataFrame(RDD.zip) instead of withColumn(udf) > {code} > df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000)) > df = df.repartition(100) > rdd = df.rdd.zip(df.map(lambda r: Row(b=r.a))).map(lambda (x,y): Row(a=x.a, > b=y.b)) > df = sqlCtx.createDataFrame(rdd, StructType(df.schema.fields + > [StructField('b', IntegerType())])) > len([r for r in df.collect() if r.a != r.b]) # Should be 0 > {code} > Fail: RDD.zip after DataFrame.repartition > {code} > df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000)) > df = df.repartition(100) > rdd = df.rdd.zip(df.map(lambda r: Row(b=r.a))).map(lambda (x,y): Row(a=x.a, > b=y.b)) > len([d for d in rdd.collect() if d.a != d.b]) # Should be 0 > {code} > Fail: RDD.zip after RDD.repartition after 100 starting partitions > - Failure requires ≥3 workers (whether dist or pseudo-dist) > {code} > rdd = sc.parallelize((Row(a=a) for a in xrange(10000)), numSlices=100) > rdd = rdd.repartition(100) > rdd = rdd.zip(rdd.map(lambda a: a)).map(lambda (a,b): Row(a=a,b=b)) > len([d for d in rdd.collect() if d.a != d.b]) # Should be 0 > {code} > Ok: RDD.zip after RDD.repartition after 1 starting partition > {code} > rdd = sc.parallelize((Row(a=a) for a in xrange(10000)), numSlices=1) > rdd = rdd.repartition(100) > rdd = rdd.zip(rdd.map(lambda a: a)).map(lambda (a,b): Row(a=a,b=b)) > len([d for d in rdd.collect() if d.a != d.b]) # Should be 0 > {code} > Test setup: > - local\[8]: {{MASTER=local\[8]}} > - pseudo-dist\[N]: 1 driver + 1 master + N workers; master and workers all on > same OS > - dist\[N]: 1 driver + 1 master + N workers; master and workers all on > separate OS's > - Spark 1.3.0-cdh5.4.5 with dist\[4] didn't trip any of the {{withColumn}} > failures, but did trip the {{zip}} failures > - {{-}} indicates a configuration I didn't try > {code} > "Ok" tests pass? "Fail" tests pass? platform cluster mode spark > version > ---------------------------------------------------------------- > yes yes ubuntu local[8] > 1.3.0-cdh5.4.5 > - - ubuntu pseudo-dist[1] > 1.3.0-cdh5.4.5 > - - ubuntu pseudo-dist[2] > 1.3.0-cdh5.4.5 > yes no[zip], yes[withColumn] ubuntu dist[4] > 1.3.0-cdh5.4.5 > yes yes osx local[8] 1.4.1 > yes yes ubuntu local[8] 1.4.1 > yes yes osx pseudo-dist[1] 1.4.1 > - - ubuntu pseudo-dist[1] 1.4.1 > yes no osx pseudo-dist[2] 1.4.1 > - - ubuntu pseudo-dist[2] 1.4.1 > - - osx dist[4] 1.4.1 > yes no ubuntu dist[4] 1.4.1 > yes yes osx local[8] 1.5.0 > yes yes ubuntu local[8] 1.5.0 > yes yes osx pseudo-dist[1] 1.5.0 > yes yes ubuntu pseudo-dist[1] 1.5.0 > yes no osx pseudo-dist[2] 1.5.0 > yes no ubuntu pseudo-dist[2] 1.5.0 > - - osx dist[4] 1.5.0 > yes no ubuntu dist[4] 1.5.0 > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org