[ 
https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14737681#comment-14737681
 ] 

Sean Owen commented on SPARK-10493:
-----------------------------------

If the RDD is a result of reduceByKey, I agree that the keys should be unique. 
Tuples implement equals and hashCode correctly, as does String, so that ought 
to be fine.

I still sort of suspect something is getting computed twice and not quite 
deterministic, but the persist() call on rdd4 immediately before ought to hide 
that. However it's still distantly possible this is the cause, since it is not 
computed and persisted before computing rdd5 starts, and might see its 
partitions reevaluated during that process.

It's a bit of a longshot but what about adding an temp4.count() for good 
measure before starting on temp5?

> reduceByKey not returning distinct results
> ------------------------------------------
>
>                 Key: SPARK-10493
>                 URL: https://issues.apache.org/jira/browse/SPARK-10493
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>            Reporter: Glenn Strycker
>         Attachments: reduceByKey_example_001.scala
>
>
> I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs 
> (using zipPartitions), partitioning by a hash partitioner, and then applying 
> a reduceByKey to summarize statistics by key.
> Since my set before the reduceByKey consists of records such as (K, V1), (K, 
> V2), (K, V3), I expect the results after reduceByKey to be just (K, 
> f(V1,V2,V3)), where the function f is appropriately associative, commutative, 
> etc.  Therefore, the results after reduceByKey ought to be distinct, correct? 
>  I am running counts of my RDD and finding that adding an additional 
> .distinct after my .reduceByKey is changing the final count!!
> Here is some example code:
> rdd3 = tempRDD1.
>    zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2).
>    partitionBy(new HashPartitioner(numPartitions)).
>    reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, 
> math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5)))
> println(rdd3.count)
> rdd4 = rdd3.distinct
> println(rdd4.count)
> I am using persistence, checkpointing, and other stuff in my actual code that 
> I did not paste here, so I can paste my actual code if it would be helpful.
> This issue may be related to SPARK-2620, except I am not using case classes, 
> to my knowledge.
> See also 
> http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to