[ https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14737001#comment-14737001 ]
Glenn Strycker commented on SPARK-10493: ---------------------------------------- In this example, our RDDs are partitioned with a hash partition, but are not ordered. I think you may be confusing zipPartitions with zipWithIndex... zipPartitions is used to merge two sets partition-wise, which enables a union without requiring any shuffles. We use zipPartitions throughout our code to make things fast, and then apply partitionBy() periodically to do the shuffles only when needed. No ordering is required. We're also not concerned with uniqueness at this point (in fact, for my application I want to keep multiplicity UNTIL the reduceByKey step), so hash collisions and such are ok for our zipPartition union step. As I've been investigating this the past few days, I went ahead and made an intermediate temp RDD that does the zipPartitions, runs partitionBy, persists, checkpoints, and then materializes the RDD. So I think this rules out that zipPartitions is causing the problems downstream for the main RDD, which only runs reduceByKey on the intermediate RDD. > reduceByKey not returning distinct results > ------------------------------------------ > > Key: SPARK-10493 > URL: https://issues.apache.org/jira/browse/SPARK-10493 > Project: Spark > Issue Type: Bug > Components: Spark Core > Reporter: Glenn Strycker > > I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs > (using zipPartitions), partitioning by a hash partitioner, and then applying > a reduceByKey to summarize statistics by key. > Since my set before the reduceByKey consists of records such as (K, V1), (K, > V2), (K, V3), I expect the results after reduceByKey to be just (K, > f(V1,V2,V3)), where the function f is appropriately associative, commutative, > etc. Therefore, the results after reduceByKey ought to be distinct, correct? > I am running counts of my RDD and finding that adding an additional > .distinct after my .reduceByKey is changing the final count!! > Here is some example code: > rdd3 = tempRDD1. > zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2). > partitionBy(new HashPartitioner(numPartitions)). > reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, > math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5))) > println(rdd3.count) > rdd4 = rdd3.distinct > println(rdd4.count) > I am using persistence, checkpointing, and other stuff in my actual code that > I did not paste here, so I can paste my actual code if it would be helpful. > This issue may be related to SPARK-2620, except I am not using case classes, > to my knowledge. > See also > http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org