[ https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14735653#comment-14735653 ]
Glenn Strycker commented on SPARK-10493: ---------------------------------------- Note: this only seems to be occurring "at scale" so far. I haven't noticed this issue for any of my unit tests, which are on the order of many 100 records, but the issue I'm currently seeing is on an RDD with approximately 1B records. As you can see in my submit command above, I'm running on 428 executors, and inside of my spark program I am requesting 2568 partitions, which is a 6x multiple, so each partition should have about 400,000 records if partitioning by the hash of the key has low enough skew. I'm wondering if my particular "reduceByKey.distinct" counts issue this has anything to do with zipPartitions or partitionBy, which are occurring before the reduceByKey. I tried splitting these steps into a separate RDD that materializes before the reduceByKey is run, but my counts for every step are the same :-/ > reduceByKey not returning distinct results > ------------------------------------------ > > Key: SPARK-10493 > URL: https://issues.apache.org/jira/browse/SPARK-10493 > Project: Spark > Issue Type: Bug > Components: Spark Core > Reporter: Glenn Strycker > > I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs > (using zipPartitions), partitioning by a hash partitioner, and then applying > a reduceByKey to summarize statistics by key. > Since my set before the reduceByKey consists of records such as (K, V1), (K, > V2), (K, V3), I expect the results after reduceByKey to be just (K, > f(V1,V2,V3)), where the function f is appropriately associative, commutative, > etc. Therefore, the results after reduceByKey ought to be distinct, correct? > I am running counts of my RDD and finding that adding an additional > .distinct after my .reduceByKey is changing the final count!! > Here is some example code: > rdd3 = tempRDD1. > zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2). > partitionBy(new HashPartitioner(numPartitions)). > reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, > math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5))) > println(rdd3.count) > rdd4 = rdd3.distinct > println(rdd4.count) > I am using persistence, checkpointing, and other stuff in my actual code that > I did not paste here, so I can paste my actual code if it would be helpful. > This issue may be related to SPARK-2620, except I am not using case classes, > to my knowledge. > See also > http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org