[jira] [Commented] (SPARK-10493) reduceByKey not returning distinct results

2015-09-10 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14738648#comment-14738648
 ] 

Sean Owen commented on SPARK-10493:
---

OK, yes I see now that temp4 is count-ed. I'm out of ideas. I suppose you can 
use union(), in any event, to move forward. It could well be an obscure bug. 
Can you try this on Spark 1.5 by any chance? there are literally thousands of 
changes since 1.3.0.

> reduceByKey not returning distinct results
> --
>
> Key: SPARK-10493
> URL: https://issues.apache.org/jira/browse/SPARK-10493
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: Glenn Strycker
> Attachments: reduceByKey_example_001.scala
>
>
> I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs 
> (using zipPartitions), partitioning by a hash partitioner, and then applying 
> a reduceByKey to summarize statistics by key.
> Since my set before the reduceByKey consists of records such as (K, V1), (K, 
> V2), (K, V3), I expect the results after reduceByKey to be just (K, 
> f(V1,V2,V3)), where the function f is appropriately associative, commutative, 
> etc.  Therefore, the results after reduceByKey ought to be distinct, correct? 
>  I am running counts of my RDD and finding that adding an additional 
> .distinct after my .reduceByKey is changing the final count!!
> Here is some example code:
> rdd3 = tempRDD1.
>zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2).
>partitionBy(new HashPartitioner(numPartitions)).
>reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, 
> math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5)))
> println(rdd3.count)
> rdd4 = rdd3.distinct
> println(rdd4.count)
> I am using persistence, checkpointing, and other stuff in my actual code that 
> I did not paste here, so I can paste my actual code if it would be helpful.
> This issue may be related to SPARK-2620, except I am not using case classes, 
> to my knowledge.
> See also 
> http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10493) reduceByKey not returning distinct results

2015-09-10 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14738829#comment-14738829
 ] 

Sean Owen commented on SPARK-10493:
---

Maybe union() tides you over; CDH 5.5 = Spark 1.5 is coming in ~2 months.

> reduceByKey not returning distinct results
> --
>
> Key: SPARK-10493
> URL: https://issues.apache.org/jira/browse/SPARK-10493
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: Glenn Strycker
> Attachments: reduceByKey_example_001.scala
>
>
> I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs 
> (using zipPartitions), partitioning by a hash partitioner, and then applying 
> a reduceByKey to summarize statistics by key.
> Since my set before the reduceByKey consists of records such as (K, V1), (K, 
> V2), (K, V3), I expect the results after reduceByKey to be just (K, 
> f(V1,V2,V3)), where the function f is appropriately associative, commutative, 
> etc.  Therefore, the results after reduceByKey ought to be distinct, correct? 
>  I am running counts of my RDD and finding that adding an additional 
> .distinct after my .reduceByKey is changing the final count!!
> Here is some example code:
> rdd3 = tempRDD1.
>zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2).
>partitionBy(new HashPartitioner(numPartitions)).
>reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, 
> math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5)))
> println(rdd3.count)
> rdd4 = rdd3.distinct
> println(rdd4.count)
> I am using persistence, checkpointing, and other stuff in my actual code that 
> I did not paste here, so I can paste my actual code if it would be helpful.
> This issue may be related to SPARK-2620, except I am not using case classes, 
> to my knowledge.
> See also 
> http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10493) reduceByKey not returning distinct results

2015-09-10 Thread Glenn Strycker (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14738794#comment-14738794
 ] 

Glenn Strycker commented on SPARK-10493:


Unfortunately we don't have anything past 1.3.0.  We're using a CDH 
installation and running on a semi-production environment, so we're going with 
their most stable version for now.  I like your idea, though, so if we install 
1.5 on a test cluster I'll give this a try.  Thanks for the tips and help!

> reduceByKey not returning distinct results
> --
>
> Key: SPARK-10493
> URL: https://issues.apache.org/jira/browse/SPARK-10493
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: Glenn Strycker
> Attachments: reduceByKey_example_001.scala
>
>
> I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs 
> (using zipPartitions), partitioning by a hash partitioner, and then applying 
> a reduceByKey to summarize statistics by key.
> Since my set before the reduceByKey consists of records such as (K, V1), (K, 
> V2), (K, V3), I expect the results after reduceByKey to be just (K, 
> f(V1,V2,V3)), where the function f is appropriately associative, commutative, 
> etc.  Therefore, the results after reduceByKey ought to be distinct, correct? 
>  I am running counts of my RDD and finding that adding an additional 
> .distinct after my .reduceByKey is changing the final count!!
> Here is some example code:
> rdd3 = tempRDD1.
>zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2).
>partitionBy(new HashPartitioner(numPartitions)).
>reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, 
> math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5)))
> println(rdd3.count)
> rdd4 = rdd3.distinct
> println(rdd4.count)
> I am using persistence, checkpointing, and other stuff in my actual code that 
> I did not paste here, so I can paste my actual code if it would be helpful.
> This issue may be related to SPARK-2620, except I am not using case classes, 
> to my knowledge.
> See also 
> http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10493) reduceByKey not returning distinct results

2015-09-09 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14736879#comment-14736879
 ] 

Sean Owen commented on SPARK-10493:
---

That much should be OK. 
zipPartitions only makes sense if you have two ordered, identically partitioned 
data sets. Is that true of the temp RDDs?
Otherwise that could be a source of nondeterminism.

> reduceByKey not returning distinct results
> --
>
> Key: SPARK-10493
> URL: https://issues.apache.org/jira/browse/SPARK-10493
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: Glenn Strycker
>
> I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs 
> (using zipPartitions), partitioning by a hash partitioner, and then applying 
> a reduceByKey to summarize statistics by key.
> Since my set before the reduceByKey consists of records such as (K, V1), (K, 
> V2), (K, V3), I expect the results after reduceByKey to be just (K, 
> f(V1,V2,V3)), where the function f is appropriately associative, commutative, 
> etc.  Therefore, the results after reduceByKey ought to be distinct, correct? 
>  I am running counts of my RDD and finding that adding an additional 
> .distinct after my .reduceByKey is changing the final count!!
> Here is some example code:
> rdd3 = tempRDD1.
>zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2).
>partitionBy(new HashPartitioner(numPartitions)).
>reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, 
> math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5)))
> println(rdd3.count)
> rdd4 = rdd3.distinct
> println(rdd4.count)
> I am using persistence, checkpointing, and other stuff in my actual code that 
> I did not paste here, so I can paste my actual code if it would be helpful.
> This issue may be related to SPARK-2620, except I am not using case classes, 
> to my knowledge.
> See also 
> http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10493) reduceByKey not returning distinct results

2015-09-09 Thread Glenn Strycker (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14736869#comment-14736869
 ] 

Glenn Strycker commented on SPARK-10493:


The RDD I am using has the form ((String, String), (String, Long, Long, Long, 
Long)), so the key is actually a (String, String) tuple.

Are there any sorting operations that would require implicit ordering, buried 
under the covers of the reduceByKey operation, that would be causing the 
problems with non-uniqueness?

Does partitionBy(HashPartitioner(numPartitions)) not work with a (String, 
String) tuple?  I've not had any noticeable problems with this before, although 
that would certainly explain errors in reduceByKey and distinct.

> reduceByKey not returning distinct results
> --
>
> Key: SPARK-10493
> URL: https://issues.apache.org/jira/browse/SPARK-10493
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: Glenn Strycker
>
> I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs 
> (using zipPartitions), partitioning by a hash partitioner, and then applying 
> a reduceByKey to summarize statistics by key.
> Since my set before the reduceByKey consists of records such as (K, V1), (K, 
> V2), (K, V3), I expect the results after reduceByKey to be just (K, 
> f(V1,V2,V3)), where the function f is appropriately associative, commutative, 
> etc.  Therefore, the results after reduceByKey ought to be distinct, correct? 
>  I am running counts of my RDD and finding that adding an additional 
> .distinct after my .reduceByKey is changing the final count!!
> Here is some example code:
> rdd3 = tempRDD1.
>zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2).
>partitionBy(new HashPartitioner(numPartitions)).
>reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, 
> math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5)))
> println(rdd3.count)
> rdd4 = rdd3.distinct
> println(rdd4.count)
> I am using persistence, checkpointing, and other stuff in my actual code that 
> I did not paste here, so I can paste my actual code if it would be helpful.
> This issue may be related to SPARK-2620, except I am not using case classes, 
> to my knowledge.
> See also 
> http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10493) reduceByKey not returning distinct results

2015-09-09 Thread Glenn Strycker (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14737001#comment-14737001
 ] 

Glenn Strycker commented on SPARK-10493:


In this example, our RDDs are partitioned with a hash partition, but are not 
ordered.

I think you may be confusing zipPartitions with zipWithIndex... zipPartitions 
is used to merge two sets partition-wise, which enables a union without 
requiring any shuffles.  We use zipPartitions throughout our code to make 
things fast, and then apply partitionBy() periodically to do the shuffles only 
when needed.  No ordering is required.  We're also not concerned with 
uniqueness at this point (in fact, for my application I want to keep 
multiplicity UNTIL the reduceByKey step), so hash collisions and such are ok 
for our zipPartition union step.

As I've been investigating this the past few days, I went ahead and made an 
intermediate temp RDD that does the zipPartitions, runs partitionBy, persists, 
checkpoints, and then materializes the RDD.  So I think this rules out that 
zipPartitions is causing the problems downstream for the main RDD, which only 
runs reduceByKey on the intermediate RDD.

> reduceByKey not returning distinct results
> --
>
> Key: SPARK-10493
> URL: https://issues.apache.org/jira/browse/SPARK-10493
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: Glenn Strycker
>
> I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs 
> (using zipPartitions), partitioning by a hash partitioner, and then applying 
> a reduceByKey to summarize statistics by key.
> Since my set before the reduceByKey consists of records such as (K, V1), (K, 
> V2), (K, V3), I expect the results after reduceByKey to be just (K, 
> f(V1,V2,V3)), where the function f is appropriately associative, commutative, 
> etc.  Therefore, the results after reduceByKey ought to be distinct, correct? 
>  I am running counts of my RDD and finding that adding an additional 
> .distinct after my .reduceByKey is changing the final count!!
> Here is some example code:
> rdd3 = tempRDD1.
>zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2).
>partitionBy(new HashPartitioner(numPartitions)).
>reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, 
> math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5)))
> println(rdd3.count)
> rdd4 = rdd3.distinct
> println(rdd4.count)
> I am using persistence, checkpointing, and other stuff in my actual code that 
> I did not paste here, so I can paste my actual code if it would be helpful.
> This issue may be related to SPARK-2620, except I am not using case classes, 
> to my knowledge.
> See also 
> http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10493) reduceByKey not returning distinct results

2015-09-09 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14737051#comment-14737051
 ] 

Sean Owen commented on SPARK-10493:
---

I think you still have the same issue with zipPartitions, unless you have an 
ordering on the RDD, since the partitions may not appear in any particular 
order, in which case zipping them may give different results. It may still not 
be the issue though, since a lot of partitionings will happen to have the 
assumed, same order anyway.

Why would this necessarily be better than union()? if you have the same # of 
partitions and same partitioning you shouldn't have a shuffle. That's also by 
the by.

I can't reproduce this in a simple, similar local example. I think there's 
something else different between what you're doing and the code snippet here.

> reduceByKey not returning distinct results
> --
>
> Key: SPARK-10493
> URL: https://issues.apache.org/jira/browse/SPARK-10493
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: Glenn Strycker
>
> I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs 
> (using zipPartitions), partitioning by a hash partitioner, and then applying 
> a reduceByKey to summarize statistics by key.
> Since my set before the reduceByKey consists of records such as (K, V1), (K, 
> V2), (K, V3), I expect the results after reduceByKey to be just (K, 
> f(V1,V2,V3)), where the function f is appropriately associative, commutative, 
> etc.  Therefore, the results after reduceByKey ought to be distinct, correct? 
>  I am running counts of my RDD and finding that adding an additional 
> .distinct after my .reduceByKey is changing the final count!!
> Here is some example code:
> rdd3 = tempRDD1.
>zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2).
>partitionBy(new HashPartitioner(numPartitions)).
>reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, 
> math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5)))
> println(rdd3.count)
> rdd4 = rdd3.distinct
> println(rdd4.count)
> I am using persistence, checkpointing, and other stuff in my actual code that 
> I did not paste here, so I can paste my actual code if it would be helpful.
> This issue may be related to SPARK-2620, except I am not using case classes, 
> to my knowledge.
> See also 
> http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10493) reduceByKey not returning distinct results

2015-09-09 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14737050#comment-14737050
 ] 

Sean Owen commented on SPARK-10493:
---

I think you still have the same issue with zipPartitions, unless you have an 
ordering on the RDD, since the partitions may not appear in any particular 
order, in which case zipping them may give different results. It may still not 
be the issue though, since a lot of partitionings will happen to have the 
assumed, same order anyway.

Why would this necessarily be better than union()? if you have the same # of 
partitions and same partitioning you shouldn't have a shuffle. That's also by 
the by.

I can't reproduce this in a simple, similar local example. I think there's 
something else different between what you're doing and the code snippet here.

> reduceByKey not returning distinct results
> --
>
> Key: SPARK-10493
> URL: https://issues.apache.org/jira/browse/SPARK-10493
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: Glenn Strycker
>
> I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs 
> (using zipPartitions), partitioning by a hash partitioner, and then applying 
> a reduceByKey to summarize statistics by key.
> Since my set before the reduceByKey consists of records such as (K, V1), (K, 
> V2), (K, V3), I expect the results after reduceByKey to be just (K, 
> f(V1,V2,V3)), where the function f is appropriately associative, commutative, 
> etc.  Therefore, the results after reduceByKey ought to be distinct, correct? 
>  I am running counts of my RDD and finding that adding an additional 
> .distinct after my .reduceByKey is changing the final count!!
> Here is some example code:
> rdd3 = tempRDD1.
>zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2).
>partitionBy(new HashPartitioner(numPartitions)).
>reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, 
> math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5)))
> println(rdd3.count)
> rdd4 = rdd3.distinct
> println(rdd4.count)
> I am using persistence, checkpointing, and other stuff in my actual code that 
> I did not paste here, so I can paste my actual code if it would be helpful.
> This issue may be related to SPARK-2620, except I am not using case classes, 
> to my knowledge.
> See also 
> http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10493) reduceByKey not returning distinct results

2015-09-09 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14737730#comment-14737730
 ] 

Sean Owen commented on SPARK-10493:
---

checkpoint doesn't materialize the RDD, which is why it occurred to me to try a 
count. I'd try that to see if it also works. If so I do have some feeling it's 
due to zipping and ordering of partitions -- especially if union() also seems 
to work.

++ is just concatenating iterators, I don't think that can matter. I also don't 
think the parent RDD types matter. It's not impossible there's a problem, but 
there are also a lot of tests exercising reduceByKey.

> reduceByKey not returning distinct results
> --
>
> Key: SPARK-10493
> URL: https://issues.apache.org/jira/browse/SPARK-10493
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: Glenn Strycker
> Attachments: reduceByKey_example_001.scala
>
>
> I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs 
> (using zipPartitions), partitioning by a hash partitioner, and then applying 
> a reduceByKey to summarize statistics by key.
> Since my set before the reduceByKey consists of records such as (K, V1), (K, 
> V2), (K, V3), I expect the results after reduceByKey to be just (K, 
> f(V1,V2,V3)), where the function f is appropriately associative, commutative, 
> etc.  Therefore, the results after reduceByKey ought to be distinct, correct? 
>  I am running counts of my RDD and finding that adding an additional 
> .distinct after my .reduceByKey is changing the final count!!
> Here is some example code:
> rdd3 = tempRDD1.
>zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2).
>partitionBy(new HashPartitioner(numPartitions)).
>reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, 
> math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5)))
> println(rdd3.count)
> rdd4 = rdd3.distinct
> println(rdd4.count)
> I am using persistence, checkpointing, and other stuff in my actual code that 
> I did not paste here, so I can paste my actual code if it would be helpful.
> This issue may be related to SPARK-2620, except I am not using case classes, 
> to my knowledge.
> See also 
> http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10493) reduceByKey not returning distinct results

2015-09-09 Thread Glenn Strycker (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14737727#comment-14737727
 ] 

Glenn Strycker commented on SPARK-10493:


I already have that added in my code that I'm testing... I've been persisting, 
checkpointing, and materializing all RDDs, including all intermediate steps.

I did try substituting union() for zipPartitions(), and that actually resulted 
in correct values!  Very weird.  What's strange is that there is no differences 
in my results on spark-shell or in a very small piece of test code I wrote to 
use spark-submit (that is, I can't replicate the original error), but this 
change did fix things in my production code.

I'm trying to discover why zipPartitions isn't behaving identically to union in 
my code... I posted a stackoverflow question along these lines, if you want to 
read over some additional code and toDebugString results:  
http://stackoverflow.com/questions/32489112/what-is-the-difference-between-union-and-zippartitions-for-apache-spark-rdds

I attempted adding some "implicit ordering" to the original code with 
zipPartitions, but that didn't fix anything -- only using union did it work.

Is it possible that ShuffledRDDs (returned by union) work with reduceByKey, but 
ZippedPartitionsRDD2s (returned by zipPartitions) do not?

Or is it possible that the "++" operator I am using inside the zipPartitions 
function isn't compatible with my particular RDD structure ((String, String), 
(String, Long, Long, Long, Long))?

Thanks so much for your help... at this point I'm tempted to replace 
zipPartitions with unions everywhere in my code, just for superstition's sake.  
I just want to understand WHY zipPartitions didn't work!!

> reduceByKey not returning distinct results
> --
>
> Key: SPARK-10493
> URL: https://issues.apache.org/jira/browse/SPARK-10493
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: Glenn Strycker
> Attachments: reduceByKey_example_001.scala
>
>
> I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs 
> (using zipPartitions), partitioning by a hash partitioner, and then applying 
> a reduceByKey to summarize statistics by key.
> Since my set before the reduceByKey consists of records such as (K, V1), (K, 
> V2), (K, V3), I expect the results after reduceByKey to be just (K, 
> f(V1,V2,V3)), where the function f is appropriately associative, commutative, 
> etc.  Therefore, the results after reduceByKey ought to be distinct, correct? 
>  I am running counts of my RDD and finding that adding an additional 
> .distinct after my .reduceByKey is changing the final count!!
> Here is some example code:
> rdd3 = tempRDD1.
>zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2).
>partitionBy(new HashPartitioner(numPartitions)).
>reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, 
> math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5)))
> println(rdd3.count)
> rdd4 = rdd3.distinct
> println(rdd4.count)
> I am using persistence, checkpointing, and other stuff in my actual code that 
> I did not paste here, so I can paste my actual code if it would be helpful.
> This issue may be related to SPARK-2620, except I am not using case classes, 
> to my knowledge.
> See also 
> http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10493) reduceByKey not returning distinct results

2015-09-09 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14737681#comment-14737681
 ] 

Sean Owen commented on SPARK-10493:
---

If the RDD is a result of reduceByKey, I agree that the keys should be unique. 
Tuples implement equals and hashCode correctly, as does String, so that ought 
to be fine.

I still sort of suspect something is getting computed twice and not quite 
deterministic, but the persist() call on rdd4 immediately before ought to hide 
that. However it's still distantly possible this is the cause, since it is not 
computed and persisted before computing rdd5 starts, and might see its 
partitions reevaluated during that process.

It's a bit of a longshot but what about adding an temp4.count() for good 
measure before starting on temp5?

> reduceByKey not returning distinct results
> --
>
> Key: SPARK-10493
> URL: https://issues.apache.org/jira/browse/SPARK-10493
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: Glenn Strycker
> Attachments: reduceByKey_example_001.scala
>
>
> I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs 
> (using zipPartitions), partitioning by a hash partitioner, and then applying 
> a reduceByKey to summarize statistics by key.
> Since my set before the reduceByKey consists of records such as (K, V1), (K, 
> V2), (K, V3), I expect the results after reduceByKey to be just (K, 
> f(V1,V2,V3)), where the function f is appropriately associative, commutative, 
> etc.  Therefore, the results after reduceByKey ought to be distinct, correct? 
>  I am running counts of my RDD and finding that adding an additional 
> .distinct after my .reduceByKey is changing the final count!!
> Here is some example code:
> rdd3 = tempRDD1.
>zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2).
>partitionBy(new HashPartitioner(numPartitions)).
>reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, 
> math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5)))
> println(rdd3.count)
> rdd4 = rdd3.distinct
> println(rdd4.count)
> I am using persistence, checkpointing, and other stuff in my actual code that 
> I did not paste here, so I can paste my actual code if it would be helpful.
> This issue may be related to SPARK-2620, except I am not using case classes, 
> to my knowledge.
> See also 
> http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10493) reduceByKey not returning distinct results

2015-09-09 Thread Glenn Strycker (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14737735#comment-14737735
 ] 

Glenn Strycker commented on SPARK-10493:


Of course.  I have count statements everywhere in order to materialize.

I usually additionally run RDD.sortByKey(true).collect().foreach(println) if 
I'm running on a small test set, or RDD.take(100).collect().foreach(println) if 
I have a larger set, just so I can see a few values.

So I'm positive that all of my intermediate/temporary RDDs are in fact 
materialized before getting to the zipPartitions/union step and the reduceByKey 
step.  I also monitor my jobs in Yarn and I can see the persisted RDDs as they 
are being cached.

> reduceByKey not returning distinct results
> --
>
> Key: SPARK-10493
> URL: https://issues.apache.org/jira/browse/SPARK-10493
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: Glenn Strycker
> Attachments: reduceByKey_example_001.scala
>
>
> I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs 
> (using zipPartitions), partitioning by a hash partitioner, and then applying 
> a reduceByKey to summarize statistics by key.
> Since my set before the reduceByKey consists of records such as (K, V1), (K, 
> V2), (K, V3), I expect the results after reduceByKey to be just (K, 
> f(V1,V2,V3)), where the function f is appropriately associative, commutative, 
> etc.  Therefore, the results after reduceByKey ought to be distinct, correct? 
>  I am running counts of my RDD and finding that adding an additional 
> .distinct after my .reduceByKey is changing the final count!!
> Here is some example code:
> rdd3 = tempRDD1.
>zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2).
>partitionBy(new HashPartitioner(numPartitions)).
>reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, 
> math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5)))
> println(rdd3.count)
> rdd4 = rdd3.distinct
> println(rdd4.count)
> I am using persistence, checkpointing, and other stuff in my actual code that 
> I did not paste here, so I can paste my actual code if it would be helpful.
> This issue may be related to SPARK-2620, except I am not using case classes, 
> to my knowledge.
> See also 
> http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10493) reduceByKey not returning distinct results

2015-09-09 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14737252#comment-14737252
 ] 

Sean Owen commented on SPARK-10493:
---

What do you mean that it's not collapsing key pairs? the output of temp5 shows 
the same keys and same count in both cases. The keys are distinct and in order 
after {{temp5.sortByKey(true).collect().foreach(println)}}

Here's my simplistic test case which gives a consistent count when I run the 
code above on this:

{code}
val bWords = sc.broadcast(sc.textFile("/usr/share/dict/words").collect())

val tempRDD1 = sc.parallelize(1 to 1000, 10).mapPartitionsWithIndex { (i, 
ns) =>
  val words = bWords.value
  val random = new scala.util.Random(i)
  ns.map { n => 
val a = words(random.nextInt(words.length))
val b = words(random.nextInt(words.length))
val c = words(random.nextInt(words.length))
val d = random.nextInt(words.length)
val e = random.nextInt(words.length)
val f = random.nextInt(words.length)
val g = random.nextInt(words.length)
((a, b), (c, d, e, f, g))
  }
}

val tempRDD2 = sc.parallelize(1 to 1000, 10).mapPartitionsWithIndex { (i, 
ns) =>
  val words = bWords.value
  val random = new scala.util.Random(i)
  ns.map { n => 
val a = words(random.nextInt(words.length))
val b = words(random.nextInt(words.length))
val c = words(random.nextInt(words.length))
val d = random.nextInt(words.length)
val e = random.nextInt(words.length)
val f = random.nextInt(words.length)
val g = random.nextInt(words.length)
((a, b), (c, d, e, f, g))
  }
}
{code}

> reduceByKey not returning distinct results
> --
>
> Key: SPARK-10493
> URL: https://issues.apache.org/jira/browse/SPARK-10493
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: Glenn Strycker
> Attachments: reduceByKey_example_001.scala
>
>
> I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs 
> (using zipPartitions), partitioning by a hash partitioner, and then applying 
> a reduceByKey to summarize statistics by key.
> Since my set before the reduceByKey consists of records such as (K, V1), (K, 
> V2), (K, V3), I expect the results after reduceByKey to be just (K, 
> f(V1,V2,V3)), where the function f is appropriately associative, commutative, 
> etc.  Therefore, the results after reduceByKey ought to be distinct, correct? 
>  I am running counts of my RDD and finding that adding an additional 
> .distinct after my .reduceByKey is changing the final count!!
> Here is some example code:
> rdd3 = tempRDD1.
>zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2).
>partitionBy(new HashPartitioner(numPartitions)).
>reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, 
> math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5)))
> println(rdd3.count)
> rdd4 = rdd3.distinct
> println(rdd4.count)
> I am using persistence, checkpointing, and other stuff in my actual code that 
> I did not paste here, so I can paste my actual code if it would be helpful.
> This issue may be related to SPARK-2620, except I am not using case classes, 
> to my knowledge.
> See also 
> http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10493) reduceByKey not returning distinct results

2015-09-09 Thread Glenn Strycker (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14737296#comment-14737296
 ] 

Glenn Strycker commented on SPARK-10493:


[~srowen], the code I attached did run correctly.  However, I have similar code 
that I run in Yarn via spark-submit that is NOT returning 1 record per key.

I mean that when I run code spark-submit that generates temp5, I get a set as 
follows:

{noformat}
((cluster021,cluster023),(cluster021,1,2,1,3))
((cluster031,cluster033),(cluster031,1,2,1,3))
((cluster041,cluster043),(cluster041,5,2,1,3))
((cluster041,cluster043),(cluster041,1,2,1,3))
((cluster041,cluster044),(cluster041,3,2,1,3))
((cluster041,cluster044),(cluster041,4,2,1,3))
((cluster051,cluster052),(cluster051,6,2,1,3))
((cluster051,cluster053),(cluster051,1,2,1,3))
((cluster051,cluster054),(cluster051,1,2,1,3))
((cluster051,cluster055),(cluster051,1,2,1,3))
((cluster051,cluster056),(cluster051,1,2,1,3))
((cluster052,cluster053),(cluster051,1,1,1,2))
((cluster052,cluster054),(cluster051,8,1,1,2))
((cluster053,cluster054),(cluster051,7,1,1,2))
((cluster055,cluster056),(cluster051,9,1,1,2))
{noformat}

note that the keys (cluster041,cluster043) or (cluster041,cluster044) have 2 
records each in the results, which should NEVER happen!

Here is what I expected (which is what I see in my example code I attached to 
this ticket, which ran successfully in spark-shell):

{noformat}
((cluster021,cluster023),(cluster021,1,2,1,3))
((cluster031,cluster033),(cluster031,1,2,1,3))
((cluster041,cluster043),(cluster041,6,2,1,3))
((cluster041,cluster044),(cluster041,7,2,1,3))
((cluster051,cluster052),(cluster051,6,2,1,3))
((cluster051,cluster053),(cluster051,1,2,1,3))
((cluster051,cluster054),(cluster051,1,2,1,3))
((cluster051,cluster055),(cluster051,1,2,1,3))
((cluster051,cluster056),(cluster051,1,2,1,3))
((cluster052,cluster053),(cluster051,1,1,1,2))
((cluster052,cluster054),(cluster051,8,1,1,2))
((cluster053,cluster054),(cluster051,7,1,1,2))
((cluster055,cluster056),(cluster051,9,1,1,2))
{noformat}

You are right that in my example, distinct is not really the issue, since the 
records with the same keys do have different values.  The issue is with 
reduceByKey, which is NOT reducing my RDDs correctly and resulting in 1 record 
per key.  Does reduceByKey not support (String, String) keys?

> reduceByKey not returning distinct results
> --
>
> Key: SPARK-10493
> URL: https://issues.apache.org/jira/browse/SPARK-10493
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: Glenn Strycker
> Attachments: reduceByKey_example_001.scala
>
>
> I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs 
> (using zipPartitions), partitioning by a hash partitioner, and then applying 
> a reduceByKey to summarize statistics by key.
> Since my set before the reduceByKey consists of records such as (K, V1), (K, 
> V2), (K, V3), I expect the results after reduceByKey to be just (K, 
> f(V1,V2,V3)), where the function f is appropriately associative, commutative, 
> etc.  Therefore, the results after reduceByKey ought to be distinct, correct? 
>  I am running counts of my RDD and finding that adding an additional 
> .distinct after my .reduceByKey is changing the final count!!
> Here is some example code:
> rdd3 = tempRDD1.
>zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2).
>partitionBy(new HashPartitioner(numPartitions)).
>reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, 
> math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5)))
> println(rdd3.count)
> rdd4 = rdd3.distinct
> println(rdd4.count)
> I am using persistence, checkpointing, and other stuff in my actual code that 
> I did not paste here, so I can paste my actual code if it would be helpful.
> This issue may be related to SPARK-2620, except I am not using case classes, 
> to my knowledge.
> See also 
> http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10493) reduceByKey not returning distinct results

2015-09-08 Thread Glenn Strycker (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14735626#comment-14735626
 ] 

Glenn Strycker commented on SPARK-10493:


Thanks for the speedy follow-up, [~frosner]!

I'm attempting now as we speak to replicate this issue in Spark Shell so that I 
can find a nice minimal example and rule out issues specific to my application. 
 I'll update this ticket when I have a better idea of what's happening.

For now, I'm submitting via spark-submit to YARN using cluster mode.  We have a 
specific set up at our company, so my actual submit script includes custom 
configs and such.  Here's a bit of what I submit, so please let me know if you 
need a particular environment variable.  Adding [~aagottlieb] and [~peterwoj] 
as watchers to this ticket so they can comment with appropriate additional 
details.

/opt/spark/bin/spark-submit --conf spark.shuffle.service.enabled=true 
--properties-file spark.conf --files log4j.properties --jars  
--class myProgram --num-executors 428 --driver-memory 25G --executor-memory 20G 
target/scala-2.10/indid3p0_2.10-1.0.jar

> reduceByKey not returning distinct results
> --
>
> Key: SPARK-10493
> URL: https://issues.apache.org/jira/browse/SPARK-10493
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: Glenn Strycker
>
> I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs 
> (using zipPartitions), partitioning by a hash partitioner, and then applying 
> a reduceByKey to summarize statistics by key.
> Since my set before the reduceByKey consists of records such as (K, V1), (K, 
> V2), (K, V3), I expect the results after reduceByKey to be just (K, 
> f(V1,V2,V3)), where the function f is appropriately associative, commutative, 
> etc.  Therefore, the results after reduceByKey ought to be distinct, correct? 
>  I am running counts of my RDD and finding that adding an additional 
> .distinct after my .reduceByKey is changing the final count!!
> Here is some example code:
> rdd3 = tempRDD1.
>zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2).
>partitionBy(new HashPartitioner(numPartitions)).
>reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, 
> math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5)))
> println(rdd3.count)
> rdd4 = rdd3.distinct
> println(rdd4.count)
> I am using persistence, checkpointing, and other stuff in my actual code that 
> I did not paste here, so I can paste my actual code if it would be helpful.
> This issue may be related to SPARK-2620, except I am not using case classes, 
> to my knowledge.
> See also 
> http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10493) reduceByKey not returning distinct results

2015-09-08 Thread Glenn Strycker (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14735653#comment-14735653
 ] 

Glenn Strycker commented on SPARK-10493:


Note: this only seems to be occurring "at scale" so far.  I haven't noticed 
this issue for any of my unit tests, which are on the order of many 100 
records, but the issue I'm currently seeing is on an RDD with approximately 1B 
records.  As you can see in my submit command above, I'm running on 428 
executors, and inside of my spark program I am requesting 2568 partitions, 
which is a 6x multiple, so each partition should have about 400,000 records if 
partitioning by the hash of the key has low enough skew.

I'm wondering if my particular "reduceByKey.distinct" counts issue this has 
anything to do with zipPartitions or partitionBy, which are occurring before 
the reduceByKey.  I tried splitting these steps into a separate RDD that 
materializes before the reduceByKey is run, but my counts for every step are 
the same :-/

> reduceByKey not returning distinct results
> --
>
> Key: SPARK-10493
> URL: https://issues.apache.org/jira/browse/SPARK-10493
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: Glenn Strycker
>
> I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs 
> (using zipPartitions), partitioning by a hash partitioner, and then applying 
> a reduceByKey to summarize statistics by key.
> Since my set before the reduceByKey consists of records such as (K, V1), (K, 
> V2), (K, V3), I expect the results after reduceByKey to be just (K, 
> f(V1,V2,V3)), where the function f is appropriately associative, commutative, 
> etc.  Therefore, the results after reduceByKey ought to be distinct, correct? 
>  I am running counts of my RDD and finding that adding an additional 
> .distinct after my .reduceByKey is changing the final count!!
> Here is some example code:
> rdd3 = tempRDD1.
>zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2).
>partitionBy(new HashPartitioner(numPartitions)).
>reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, 
> math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5)))
> println(rdd3.count)
> rdd4 = rdd3.distinct
> println(rdd4.count)
> I am using persistence, checkpointing, and other stuff in my actual code that 
> I did not paste here, so I can paste my actual code if it would be helpful.
> This issue may be related to SPARK-2620, except I am not using case classes, 
> to my knowledge.
> See also 
> http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10493) reduceByKey not returning distinct results

2015-09-08 Thread Frank Rosner (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14735598#comment-14735598
 ] 

Frank Rosner commented on SPARK-10493:
--

Thanks for submitting the issue, [~glenn.strycker] :)

Can you provide a minimal example so we can try to reproduce the issue? It 
should also contain the submit command (or are you using the shell)?

> reduceByKey not returning distinct results
> --
>
> Key: SPARK-10493
> URL: https://issues.apache.org/jira/browse/SPARK-10493
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: Glenn Strycker
>
> I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs 
> (using zipPartitions), partitioning by a hash partitioner, and then applying 
> a reduceByKey to summarize statistics by key.
> Since my set before the reduceByKey consists of records such as (K, V1), (K, 
> V2), (K, V3), I expect the results after reduceByKey to be just (K, 
> f(V1,V2,V3)), where the function f is appropriately associative, commutative, 
> etc.  Therefore, the results after reduceByKey ought to be distinct, correct? 
>  I am running counts of my RDD and finding that adding an additional 
> .distinct after my .reduceByKey is changing the final count!!
> Here is some example code:
> rdd3 = tempRDD1.
>zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2).
>partitionBy(new HashPartitioner(numPartitions)).
>reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, 
> math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5)))
> println(rdd3.count)
> rdd4 = rdd3.distinct
> println(rdd4.count)
> I am using persistence, checkpointing, and other stuff in my actual code that 
> I did not paste here, so I can paste my actual code if it would be helpful.
> This issue may be related to SPARK-2620, except I am not using case classes, 
> to my knowledge.
> See also 
> http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10493) reduceByKey not returning distinct results

2015-09-08 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14735772#comment-14735772
 ] 

Sean Owen commented on SPARK-10493:
---

There are some key pieces of info missing, like what the key and value types 
are. This could happen if they do not implement equals/hashCode correctly. It 
could also happen if your input is nondeterministic. Without a reproducible 
example with these details I don't think this is an actionable JIRA.

> reduceByKey not returning distinct results
> --
>
> Key: SPARK-10493
> URL: https://issues.apache.org/jira/browse/SPARK-10493
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: Glenn Strycker
>
> I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs 
> (using zipPartitions), partitioning by a hash partitioner, and then applying 
> a reduceByKey to summarize statistics by key.
> Since my set before the reduceByKey consists of records such as (K, V1), (K, 
> V2), (K, V3), I expect the results after reduceByKey to be just (K, 
> f(V1,V2,V3)), where the function f is appropriately associative, commutative, 
> etc.  Therefore, the results after reduceByKey ought to be distinct, correct? 
>  I am running counts of my RDD and finding that adding an additional 
> .distinct after my .reduceByKey is changing the final count!!
> Here is some example code:
> rdd3 = tempRDD1.
>zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2).
>partitionBy(new HashPartitioner(numPartitions)).
>reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, 
> math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5)))
> println(rdd3.count)
> rdd4 = rdd3.distinct
> println(rdd4.count)
> I am using persistence, checkpointing, and other stuff in my actual code that 
> I did not paste here, so I can paste my actual code if it would be helpful.
> This issue may be related to SPARK-2620, except I am not using case classes, 
> to my knowledge.
> See also 
> http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org