[ 
https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14737001#comment-14737001
 ] 

Glenn Strycker commented on SPARK-10493:
----------------------------------------

In this example, our RDDs are partitioned with a hash partition, but are not 
ordered.

I think you may be confusing zipPartitions with zipWithIndex... zipPartitions 
is used to merge two sets partition-wise, which enables a union without 
requiring any shuffles.  We use zipPartitions throughout our code to make 
things fast, and then apply partitionBy() periodically to do the shuffles only 
when needed.  No ordering is required.  We're also not concerned with 
uniqueness at this point (in fact, for my application I want to keep 
multiplicity UNTIL the reduceByKey step), so hash collisions and such are ok 
for our zipPartition union step.

As I've been investigating this the past few days, I went ahead and made an 
intermediate temp RDD that does the zipPartitions, runs partitionBy, persists, 
checkpoints, and then materializes the RDD.  So I think this rules out that 
zipPartitions is causing the problems downstream for the main RDD, which only 
runs reduceByKey on the intermediate RDD.

> reduceByKey not returning distinct results
> ------------------------------------------
>
>                 Key: SPARK-10493
>                 URL: https://issues.apache.org/jira/browse/SPARK-10493
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>            Reporter: Glenn Strycker
>
> I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs 
> (using zipPartitions), partitioning by a hash partitioner, and then applying 
> a reduceByKey to summarize statistics by key.
> Since my set before the reduceByKey consists of records such as (K, V1), (K, 
> V2), (K, V3), I expect the results after reduceByKey to be just (K, 
> f(V1,V2,V3)), where the function f is appropriately associative, commutative, 
> etc.  Therefore, the results after reduceByKey ought to be distinct, correct? 
>  I am running counts of my RDD and finding that adding an additional 
> .distinct after my .reduceByKey is changing the final count!!
> Here is some example code:
> rdd3 = tempRDD1.
>    zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2).
>    partitionBy(new HashPartitioner(numPartitions)).
>    reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, 
> math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5)))
> println(rdd3.count)
> rdd4 = rdd3.distinct
> println(rdd4.count)
> I am using persistence, checkpointing, and other stuff in my actual code that 
> I did not paste here, so I can paste my actual code if it would be helpful.
> This issue may be related to SPARK-2620, except I am not using case classes, 
> to my knowledge.
> See also 
> http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to