[ https://issues.apache.org/jira/browse/SPARK-10939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14948596#comment-14948596 ]
Michael Malak commented on SPARK-10939: --------------------------------------- Here Matei explains the explicit design decision to prefer shuffle performance arising from randomization over deterministic RDD computation: https://issues.apache.org/jira/browse/SPARK-3098?focusedCommentId=14110183&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14110183 It has made it into the documentation (though perhaps not clearly enough, especially regarding the rationale): https://issues.apache.org/jira/browse/SPARK-3356 https://github.com/apache/spark/pull/2508/files > Misaligned data with RDD.zip after repartition > ---------------------------------------------- > > Key: SPARK-10939 > URL: https://issues.apache.org/jira/browse/SPARK-10939 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 1.3.0, 1.4.1, 1.5.0 > Environment: - OSX 10.10.4, java 1.7.0_51, hadoop 2.6.0-cdh5.4.5 > - Ubuntu 12.04, java 1.7.0_80, hadoop 2.6.0-cdh5.4.5 > Reporter: Dan Brown > > Split out from https://issues.apache.org/jira/browse/SPARK-10685: > Here's a weird behavior where {{RDD.zip}} after a {{repartition}} produces > "misaligned" data, meaning different column values in the same row aren't > matched, as if a zip shuffled the collections before zipping them. It's > difficult to reproduce because it's nondeterministic, doesn't occur in local > mode, and requires ≥2 workers (≥3 in one case). I was able to repro it using > pyspark 1.3.0 (cdh5.4.5), 1.4.1 (bin-without-hadoop), and 1.5.0 > (bin-without-hadoop). > Also, this {{DataFrame.zip}} issue is related in spirit, since we were trying > to build it ourselves when we ran into this problem. Let me put in my vote > for reopening the issue and supporting {{DataFrame.zip}} in the standard lib. > - https://issues.apache.org/jira/browse/SPARK-7460 > h3. Repro > Fail: RDD.zip after repartition > {code} > df = sqlCtx.createDataFrame(Row(a=a) for a in xrange(10000)) > df = df.repartition(100) > rdd = df.rdd.zip(df.map(lambda r: Row(b=r.a))).map(lambda (x,y): Row(a=x.a, > b=y.b)) > [r for r in rdd.collect() if r.a != r.b][:3] # Should be [] > {code} > Sample outputs (nondeterministic): > {code} > [] > [Row(a=50, b=6947), Row(a=150, b=7047), Row(a=250, b=7147)] > [] > [] > [Row(a=44, b=644), Row(a=144, b=744), Row(a=244, b=844)] > [] > {code} > Test setup: > - local\[8]: {{MASTER=local\[8]}} > - dist\[N]: 1 driver + 1 master + N workers > {code} > "Fail" tests pass? cluster mode spark version > ---------------------------------------------------- > yes local[8] 1.3.0-cdh5.4.5 > no dist[4] 1.3.0-cdh5.4.5 > yes local[8] 1.4.1 > yes dist[1] 1.4.1 > no dist[2] 1.4.1 > no dist[4] 1.4.1 > yes local[8] 1.5.0 > yes dist[1] 1.5.0 > no dist[2] 1.5.0 > no dist[4] 1.5.0 > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org