[jira] [Commented] (SPARK-10493) reduceByKey not returning distinct results
[ https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14738648#comment-14738648 ] Sean Owen commented on SPARK-10493: --- OK, yes I see now that temp4 is count-ed. I'm out of ideas. I suppose you can use union(), in any event, to move forward. It could well be an obscure bug. Can you try this on Spark 1.5 by any chance? there are literally thousands of changes since 1.3.0. > reduceByKey not returning distinct results > -- > > Key: SPARK-10493 > URL: https://issues.apache.org/jira/browse/SPARK-10493 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: Glenn Strycker > Attachments: reduceByKey_example_001.scala > > > I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs > (using zipPartitions), partitioning by a hash partitioner, and then applying > a reduceByKey to summarize statistics by key. > Since my set before the reduceByKey consists of records such as (K, V1), (K, > V2), (K, V3), I expect the results after reduceByKey to be just (K, > f(V1,V2,V3)), where the function f is appropriately associative, commutative, > etc. Therefore, the results after reduceByKey ought to be distinct, correct? > I am running counts of my RDD and finding that adding an additional > .distinct after my .reduceByKey is changing the final count!! > Here is some example code: > rdd3 = tempRDD1. >zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2). >partitionBy(new HashPartitioner(numPartitions)). >reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, > math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5))) > println(rdd3.count) > rdd4 = rdd3.distinct > println(rdd4.count) > I am using persistence, checkpointing, and other stuff in my actual code that > I did not paste here, so I can paste my actual code if it would be helpful. > This issue may be related to SPARK-2620, except I am not using case classes, > to my knowledge. > See also > http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10493) reduceByKey not returning distinct results
[ https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14738829#comment-14738829 ] Sean Owen commented on SPARK-10493: --- Maybe union() tides you over; CDH 5.5 = Spark 1.5 is coming in ~2 months. > reduceByKey not returning distinct results > -- > > Key: SPARK-10493 > URL: https://issues.apache.org/jira/browse/SPARK-10493 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: Glenn Strycker > Attachments: reduceByKey_example_001.scala > > > I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs > (using zipPartitions), partitioning by a hash partitioner, and then applying > a reduceByKey to summarize statistics by key. > Since my set before the reduceByKey consists of records such as (K, V1), (K, > V2), (K, V3), I expect the results after reduceByKey to be just (K, > f(V1,V2,V3)), where the function f is appropriately associative, commutative, > etc. Therefore, the results after reduceByKey ought to be distinct, correct? > I am running counts of my RDD and finding that adding an additional > .distinct after my .reduceByKey is changing the final count!! > Here is some example code: > rdd3 = tempRDD1. >zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2). >partitionBy(new HashPartitioner(numPartitions)). >reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, > math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5))) > println(rdd3.count) > rdd4 = rdd3.distinct > println(rdd4.count) > I am using persistence, checkpointing, and other stuff in my actual code that > I did not paste here, so I can paste my actual code if it would be helpful. > This issue may be related to SPARK-2620, except I am not using case classes, > to my knowledge. > See also > http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10493) reduceByKey not returning distinct results
[ https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14738794#comment-14738794 ] Glenn Strycker commented on SPARK-10493: Unfortunately we don't have anything past 1.3.0. We're using a CDH installation and running on a semi-production environment, so we're going with their most stable version for now. I like your idea, though, so if we install 1.5 on a test cluster I'll give this a try. Thanks for the tips and help! > reduceByKey not returning distinct results > -- > > Key: SPARK-10493 > URL: https://issues.apache.org/jira/browse/SPARK-10493 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: Glenn Strycker > Attachments: reduceByKey_example_001.scala > > > I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs > (using zipPartitions), partitioning by a hash partitioner, and then applying > a reduceByKey to summarize statistics by key. > Since my set before the reduceByKey consists of records such as (K, V1), (K, > V2), (K, V3), I expect the results after reduceByKey to be just (K, > f(V1,V2,V3)), where the function f is appropriately associative, commutative, > etc. Therefore, the results after reduceByKey ought to be distinct, correct? > I am running counts of my RDD and finding that adding an additional > .distinct after my .reduceByKey is changing the final count!! > Here is some example code: > rdd3 = tempRDD1. >zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2). >partitionBy(new HashPartitioner(numPartitions)). >reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, > math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5))) > println(rdd3.count) > rdd4 = rdd3.distinct > println(rdd4.count) > I am using persistence, checkpointing, and other stuff in my actual code that > I did not paste here, so I can paste my actual code if it would be helpful. > This issue may be related to SPARK-2620, except I am not using case classes, > to my knowledge. > See also > http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10493) reduceByKey not returning distinct results
[ https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14736879#comment-14736879 ] Sean Owen commented on SPARK-10493: --- That much should be OK. zipPartitions only makes sense if you have two ordered, identically partitioned data sets. Is that true of the temp RDDs? Otherwise that could be a source of nondeterminism. > reduceByKey not returning distinct results > -- > > Key: SPARK-10493 > URL: https://issues.apache.org/jira/browse/SPARK-10493 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: Glenn Strycker > > I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs > (using zipPartitions), partitioning by a hash partitioner, and then applying > a reduceByKey to summarize statistics by key. > Since my set before the reduceByKey consists of records such as (K, V1), (K, > V2), (K, V3), I expect the results after reduceByKey to be just (K, > f(V1,V2,V3)), where the function f is appropriately associative, commutative, > etc. Therefore, the results after reduceByKey ought to be distinct, correct? > I am running counts of my RDD and finding that adding an additional > .distinct after my .reduceByKey is changing the final count!! > Here is some example code: > rdd3 = tempRDD1. >zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2). >partitionBy(new HashPartitioner(numPartitions)). >reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, > math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5))) > println(rdd3.count) > rdd4 = rdd3.distinct > println(rdd4.count) > I am using persistence, checkpointing, and other stuff in my actual code that > I did not paste here, so I can paste my actual code if it would be helpful. > This issue may be related to SPARK-2620, except I am not using case classes, > to my knowledge. > See also > http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10493) reduceByKey not returning distinct results
[ https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14736869#comment-14736869 ] Glenn Strycker commented on SPARK-10493: The RDD I am using has the form ((String, String), (String, Long, Long, Long, Long)), so the key is actually a (String, String) tuple. Are there any sorting operations that would require implicit ordering, buried under the covers of the reduceByKey operation, that would be causing the problems with non-uniqueness? Does partitionBy(HashPartitioner(numPartitions)) not work with a (String, String) tuple? I've not had any noticeable problems with this before, although that would certainly explain errors in reduceByKey and distinct. > reduceByKey not returning distinct results > -- > > Key: SPARK-10493 > URL: https://issues.apache.org/jira/browse/SPARK-10493 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: Glenn Strycker > > I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs > (using zipPartitions), partitioning by a hash partitioner, and then applying > a reduceByKey to summarize statistics by key. > Since my set before the reduceByKey consists of records such as (K, V1), (K, > V2), (K, V3), I expect the results after reduceByKey to be just (K, > f(V1,V2,V3)), where the function f is appropriately associative, commutative, > etc. Therefore, the results after reduceByKey ought to be distinct, correct? > I am running counts of my RDD and finding that adding an additional > .distinct after my .reduceByKey is changing the final count!! > Here is some example code: > rdd3 = tempRDD1. >zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2). >partitionBy(new HashPartitioner(numPartitions)). >reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, > math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5))) > println(rdd3.count) > rdd4 = rdd3.distinct > println(rdd4.count) > I am using persistence, checkpointing, and other stuff in my actual code that > I did not paste here, so I can paste my actual code if it would be helpful. > This issue may be related to SPARK-2620, except I am not using case classes, > to my knowledge. > See also > http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10493) reduceByKey not returning distinct results
[ https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14737001#comment-14737001 ] Glenn Strycker commented on SPARK-10493: In this example, our RDDs are partitioned with a hash partition, but are not ordered. I think you may be confusing zipPartitions with zipWithIndex... zipPartitions is used to merge two sets partition-wise, which enables a union without requiring any shuffles. We use zipPartitions throughout our code to make things fast, and then apply partitionBy() periodically to do the shuffles only when needed. No ordering is required. We're also not concerned with uniqueness at this point (in fact, for my application I want to keep multiplicity UNTIL the reduceByKey step), so hash collisions and such are ok for our zipPartition union step. As I've been investigating this the past few days, I went ahead and made an intermediate temp RDD that does the zipPartitions, runs partitionBy, persists, checkpoints, and then materializes the RDD. So I think this rules out that zipPartitions is causing the problems downstream for the main RDD, which only runs reduceByKey on the intermediate RDD. > reduceByKey not returning distinct results > -- > > Key: SPARK-10493 > URL: https://issues.apache.org/jira/browse/SPARK-10493 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: Glenn Strycker > > I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs > (using zipPartitions), partitioning by a hash partitioner, and then applying > a reduceByKey to summarize statistics by key. > Since my set before the reduceByKey consists of records such as (K, V1), (K, > V2), (K, V3), I expect the results after reduceByKey to be just (K, > f(V1,V2,V3)), where the function f is appropriately associative, commutative, > etc. Therefore, the results after reduceByKey ought to be distinct, correct? > I am running counts of my RDD and finding that adding an additional > .distinct after my .reduceByKey is changing the final count!! > Here is some example code: > rdd3 = tempRDD1. >zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2). >partitionBy(new HashPartitioner(numPartitions)). >reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, > math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5))) > println(rdd3.count) > rdd4 = rdd3.distinct > println(rdd4.count) > I am using persistence, checkpointing, and other stuff in my actual code that > I did not paste here, so I can paste my actual code if it would be helpful. > This issue may be related to SPARK-2620, except I am not using case classes, > to my knowledge. > See also > http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10493) reduceByKey not returning distinct results
[ https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14737051#comment-14737051 ] Sean Owen commented on SPARK-10493: --- I think you still have the same issue with zipPartitions, unless you have an ordering on the RDD, since the partitions may not appear in any particular order, in which case zipping them may give different results. It may still not be the issue though, since a lot of partitionings will happen to have the assumed, same order anyway. Why would this necessarily be better than union()? if you have the same # of partitions and same partitioning you shouldn't have a shuffle. That's also by the by. I can't reproduce this in a simple, similar local example. I think there's something else different between what you're doing and the code snippet here. > reduceByKey not returning distinct results > -- > > Key: SPARK-10493 > URL: https://issues.apache.org/jira/browse/SPARK-10493 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: Glenn Strycker > > I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs > (using zipPartitions), partitioning by a hash partitioner, and then applying > a reduceByKey to summarize statistics by key. > Since my set before the reduceByKey consists of records such as (K, V1), (K, > V2), (K, V3), I expect the results after reduceByKey to be just (K, > f(V1,V2,V3)), where the function f is appropriately associative, commutative, > etc. Therefore, the results after reduceByKey ought to be distinct, correct? > I am running counts of my RDD and finding that adding an additional > .distinct after my .reduceByKey is changing the final count!! > Here is some example code: > rdd3 = tempRDD1. >zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2). >partitionBy(new HashPartitioner(numPartitions)). >reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, > math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5))) > println(rdd3.count) > rdd4 = rdd3.distinct > println(rdd4.count) > I am using persistence, checkpointing, and other stuff in my actual code that > I did not paste here, so I can paste my actual code if it would be helpful. > This issue may be related to SPARK-2620, except I am not using case classes, > to my knowledge. > See also > http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10493) reduceByKey not returning distinct results
[ https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14737050#comment-14737050 ] Sean Owen commented on SPARK-10493: --- I think you still have the same issue with zipPartitions, unless you have an ordering on the RDD, since the partitions may not appear in any particular order, in which case zipping them may give different results. It may still not be the issue though, since a lot of partitionings will happen to have the assumed, same order anyway. Why would this necessarily be better than union()? if you have the same # of partitions and same partitioning you shouldn't have a shuffle. That's also by the by. I can't reproduce this in a simple, similar local example. I think there's something else different between what you're doing and the code snippet here. > reduceByKey not returning distinct results > -- > > Key: SPARK-10493 > URL: https://issues.apache.org/jira/browse/SPARK-10493 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: Glenn Strycker > > I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs > (using zipPartitions), partitioning by a hash partitioner, and then applying > a reduceByKey to summarize statistics by key. > Since my set before the reduceByKey consists of records such as (K, V1), (K, > V2), (K, V3), I expect the results after reduceByKey to be just (K, > f(V1,V2,V3)), where the function f is appropriately associative, commutative, > etc. Therefore, the results after reduceByKey ought to be distinct, correct? > I am running counts of my RDD and finding that adding an additional > .distinct after my .reduceByKey is changing the final count!! > Here is some example code: > rdd3 = tempRDD1. >zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2). >partitionBy(new HashPartitioner(numPartitions)). >reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, > math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5))) > println(rdd3.count) > rdd4 = rdd3.distinct > println(rdd4.count) > I am using persistence, checkpointing, and other stuff in my actual code that > I did not paste here, so I can paste my actual code if it would be helpful. > This issue may be related to SPARK-2620, except I am not using case classes, > to my knowledge. > See also > http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10493) reduceByKey not returning distinct results
[ https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14737730#comment-14737730 ] Sean Owen commented on SPARK-10493: --- checkpoint doesn't materialize the RDD, which is why it occurred to me to try a count. I'd try that to see if it also works. If so I do have some feeling it's due to zipping and ordering of partitions -- especially if union() also seems to work. ++ is just concatenating iterators, I don't think that can matter. I also don't think the parent RDD types matter. It's not impossible there's a problem, but there are also a lot of tests exercising reduceByKey. > reduceByKey not returning distinct results > -- > > Key: SPARK-10493 > URL: https://issues.apache.org/jira/browse/SPARK-10493 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: Glenn Strycker > Attachments: reduceByKey_example_001.scala > > > I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs > (using zipPartitions), partitioning by a hash partitioner, and then applying > a reduceByKey to summarize statistics by key. > Since my set before the reduceByKey consists of records such as (K, V1), (K, > V2), (K, V3), I expect the results after reduceByKey to be just (K, > f(V1,V2,V3)), where the function f is appropriately associative, commutative, > etc. Therefore, the results after reduceByKey ought to be distinct, correct? > I am running counts of my RDD and finding that adding an additional > .distinct after my .reduceByKey is changing the final count!! > Here is some example code: > rdd3 = tempRDD1. >zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2). >partitionBy(new HashPartitioner(numPartitions)). >reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, > math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5))) > println(rdd3.count) > rdd4 = rdd3.distinct > println(rdd4.count) > I am using persistence, checkpointing, and other stuff in my actual code that > I did not paste here, so I can paste my actual code if it would be helpful. > This issue may be related to SPARK-2620, except I am not using case classes, > to my knowledge. > See also > http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10493) reduceByKey not returning distinct results
[ https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14737727#comment-14737727 ] Glenn Strycker commented on SPARK-10493: I already have that added in my code that I'm testing... I've been persisting, checkpointing, and materializing all RDDs, including all intermediate steps. I did try substituting union() for zipPartitions(), and that actually resulted in correct values! Very weird. What's strange is that there is no differences in my results on spark-shell or in a very small piece of test code I wrote to use spark-submit (that is, I can't replicate the original error), but this change did fix things in my production code. I'm trying to discover why zipPartitions isn't behaving identically to union in my code... I posted a stackoverflow question along these lines, if you want to read over some additional code and toDebugString results: http://stackoverflow.com/questions/32489112/what-is-the-difference-between-union-and-zippartitions-for-apache-spark-rdds I attempted adding some "implicit ordering" to the original code with zipPartitions, but that didn't fix anything -- only using union did it work. Is it possible that ShuffledRDDs (returned by union) work with reduceByKey, but ZippedPartitionsRDD2s (returned by zipPartitions) do not? Or is it possible that the "++" operator I am using inside the zipPartitions function isn't compatible with my particular RDD structure ((String, String), (String, Long, Long, Long, Long))? Thanks so much for your help... at this point I'm tempted to replace zipPartitions with unions everywhere in my code, just for superstition's sake. I just want to understand WHY zipPartitions didn't work!! > reduceByKey not returning distinct results > -- > > Key: SPARK-10493 > URL: https://issues.apache.org/jira/browse/SPARK-10493 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: Glenn Strycker > Attachments: reduceByKey_example_001.scala > > > I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs > (using zipPartitions), partitioning by a hash partitioner, and then applying > a reduceByKey to summarize statistics by key. > Since my set before the reduceByKey consists of records such as (K, V1), (K, > V2), (K, V3), I expect the results after reduceByKey to be just (K, > f(V1,V2,V3)), where the function f is appropriately associative, commutative, > etc. Therefore, the results after reduceByKey ought to be distinct, correct? > I am running counts of my RDD and finding that adding an additional > .distinct after my .reduceByKey is changing the final count!! > Here is some example code: > rdd3 = tempRDD1. >zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2). >partitionBy(new HashPartitioner(numPartitions)). >reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, > math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5))) > println(rdd3.count) > rdd4 = rdd3.distinct > println(rdd4.count) > I am using persistence, checkpointing, and other stuff in my actual code that > I did not paste here, so I can paste my actual code if it would be helpful. > This issue may be related to SPARK-2620, except I am not using case classes, > to my knowledge. > See also > http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10493) reduceByKey not returning distinct results
[ https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14737681#comment-14737681 ] Sean Owen commented on SPARK-10493: --- If the RDD is a result of reduceByKey, I agree that the keys should be unique. Tuples implement equals and hashCode correctly, as does String, so that ought to be fine. I still sort of suspect something is getting computed twice and not quite deterministic, but the persist() call on rdd4 immediately before ought to hide that. However it's still distantly possible this is the cause, since it is not computed and persisted before computing rdd5 starts, and might see its partitions reevaluated during that process. It's a bit of a longshot but what about adding an temp4.count() for good measure before starting on temp5? > reduceByKey not returning distinct results > -- > > Key: SPARK-10493 > URL: https://issues.apache.org/jira/browse/SPARK-10493 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: Glenn Strycker > Attachments: reduceByKey_example_001.scala > > > I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs > (using zipPartitions), partitioning by a hash partitioner, and then applying > a reduceByKey to summarize statistics by key. > Since my set before the reduceByKey consists of records such as (K, V1), (K, > V2), (K, V3), I expect the results after reduceByKey to be just (K, > f(V1,V2,V3)), where the function f is appropriately associative, commutative, > etc. Therefore, the results after reduceByKey ought to be distinct, correct? > I am running counts of my RDD and finding that adding an additional > .distinct after my .reduceByKey is changing the final count!! > Here is some example code: > rdd3 = tempRDD1. >zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2). >partitionBy(new HashPartitioner(numPartitions)). >reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, > math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5))) > println(rdd3.count) > rdd4 = rdd3.distinct > println(rdd4.count) > I am using persistence, checkpointing, and other stuff in my actual code that > I did not paste here, so I can paste my actual code if it would be helpful. > This issue may be related to SPARK-2620, except I am not using case classes, > to my knowledge. > See also > http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10493) reduceByKey not returning distinct results
[ https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14737735#comment-14737735 ] Glenn Strycker commented on SPARK-10493: Of course. I have count statements everywhere in order to materialize. I usually additionally run RDD.sortByKey(true).collect().foreach(println) if I'm running on a small test set, or RDD.take(100).collect().foreach(println) if I have a larger set, just so I can see a few values. So I'm positive that all of my intermediate/temporary RDDs are in fact materialized before getting to the zipPartitions/union step and the reduceByKey step. I also monitor my jobs in Yarn and I can see the persisted RDDs as they are being cached. > reduceByKey not returning distinct results > -- > > Key: SPARK-10493 > URL: https://issues.apache.org/jira/browse/SPARK-10493 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: Glenn Strycker > Attachments: reduceByKey_example_001.scala > > > I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs > (using zipPartitions), partitioning by a hash partitioner, and then applying > a reduceByKey to summarize statistics by key. > Since my set before the reduceByKey consists of records such as (K, V1), (K, > V2), (K, V3), I expect the results after reduceByKey to be just (K, > f(V1,V2,V3)), where the function f is appropriately associative, commutative, > etc. Therefore, the results after reduceByKey ought to be distinct, correct? > I am running counts of my RDD and finding that adding an additional > .distinct after my .reduceByKey is changing the final count!! > Here is some example code: > rdd3 = tempRDD1. >zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2). >partitionBy(new HashPartitioner(numPartitions)). >reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, > math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5))) > println(rdd3.count) > rdd4 = rdd3.distinct > println(rdd4.count) > I am using persistence, checkpointing, and other stuff in my actual code that > I did not paste here, so I can paste my actual code if it would be helpful. > This issue may be related to SPARK-2620, except I am not using case classes, > to my knowledge. > See also > http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10493) reduceByKey not returning distinct results
[ https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14737252#comment-14737252 ] Sean Owen commented on SPARK-10493: --- What do you mean that it's not collapsing key pairs? the output of temp5 shows the same keys and same count in both cases. The keys are distinct and in order after {{temp5.sortByKey(true).collect().foreach(println)}} Here's my simplistic test case which gives a consistent count when I run the code above on this: {code} val bWords = sc.broadcast(sc.textFile("/usr/share/dict/words").collect()) val tempRDD1 = sc.parallelize(1 to 1000, 10).mapPartitionsWithIndex { (i, ns) => val words = bWords.value val random = new scala.util.Random(i) ns.map { n => val a = words(random.nextInt(words.length)) val b = words(random.nextInt(words.length)) val c = words(random.nextInt(words.length)) val d = random.nextInt(words.length) val e = random.nextInt(words.length) val f = random.nextInt(words.length) val g = random.nextInt(words.length) ((a, b), (c, d, e, f, g)) } } val tempRDD2 = sc.parallelize(1 to 1000, 10).mapPartitionsWithIndex { (i, ns) => val words = bWords.value val random = new scala.util.Random(i) ns.map { n => val a = words(random.nextInt(words.length)) val b = words(random.nextInt(words.length)) val c = words(random.nextInt(words.length)) val d = random.nextInt(words.length) val e = random.nextInt(words.length) val f = random.nextInt(words.length) val g = random.nextInt(words.length) ((a, b), (c, d, e, f, g)) } } {code} > reduceByKey not returning distinct results > -- > > Key: SPARK-10493 > URL: https://issues.apache.org/jira/browse/SPARK-10493 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: Glenn Strycker > Attachments: reduceByKey_example_001.scala > > > I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs > (using zipPartitions), partitioning by a hash partitioner, and then applying > a reduceByKey to summarize statistics by key. > Since my set before the reduceByKey consists of records such as (K, V1), (K, > V2), (K, V3), I expect the results after reduceByKey to be just (K, > f(V1,V2,V3)), where the function f is appropriately associative, commutative, > etc. Therefore, the results after reduceByKey ought to be distinct, correct? > I am running counts of my RDD and finding that adding an additional > .distinct after my .reduceByKey is changing the final count!! > Here is some example code: > rdd3 = tempRDD1. >zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2). >partitionBy(new HashPartitioner(numPartitions)). >reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, > math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5))) > println(rdd3.count) > rdd4 = rdd3.distinct > println(rdd4.count) > I am using persistence, checkpointing, and other stuff in my actual code that > I did not paste here, so I can paste my actual code if it would be helpful. > This issue may be related to SPARK-2620, except I am not using case classes, > to my knowledge. > See also > http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10493) reduceByKey not returning distinct results
[ https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14737296#comment-14737296 ] Glenn Strycker commented on SPARK-10493: [~srowen], the code I attached did run correctly. However, I have similar code that I run in Yarn via spark-submit that is NOT returning 1 record per key. I mean that when I run code spark-submit that generates temp5, I get a set as follows: {noformat} ((cluster021,cluster023),(cluster021,1,2,1,3)) ((cluster031,cluster033),(cluster031,1,2,1,3)) ((cluster041,cluster043),(cluster041,5,2,1,3)) ((cluster041,cluster043),(cluster041,1,2,1,3)) ((cluster041,cluster044),(cluster041,3,2,1,3)) ((cluster041,cluster044),(cluster041,4,2,1,3)) ((cluster051,cluster052),(cluster051,6,2,1,3)) ((cluster051,cluster053),(cluster051,1,2,1,3)) ((cluster051,cluster054),(cluster051,1,2,1,3)) ((cluster051,cluster055),(cluster051,1,2,1,3)) ((cluster051,cluster056),(cluster051,1,2,1,3)) ((cluster052,cluster053),(cluster051,1,1,1,2)) ((cluster052,cluster054),(cluster051,8,1,1,2)) ((cluster053,cluster054),(cluster051,7,1,1,2)) ((cluster055,cluster056),(cluster051,9,1,1,2)) {noformat} note that the keys (cluster041,cluster043) or (cluster041,cluster044) have 2 records each in the results, which should NEVER happen! Here is what I expected (which is what I see in my example code I attached to this ticket, which ran successfully in spark-shell): {noformat} ((cluster021,cluster023),(cluster021,1,2,1,3)) ((cluster031,cluster033),(cluster031,1,2,1,3)) ((cluster041,cluster043),(cluster041,6,2,1,3)) ((cluster041,cluster044),(cluster041,7,2,1,3)) ((cluster051,cluster052),(cluster051,6,2,1,3)) ((cluster051,cluster053),(cluster051,1,2,1,3)) ((cluster051,cluster054),(cluster051,1,2,1,3)) ((cluster051,cluster055),(cluster051,1,2,1,3)) ((cluster051,cluster056),(cluster051,1,2,1,3)) ((cluster052,cluster053),(cluster051,1,1,1,2)) ((cluster052,cluster054),(cluster051,8,1,1,2)) ((cluster053,cluster054),(cluster051,7,1,1,2)) ((cluster055,cluster056),(cluster051,9,1,1,2)) {noformat} You are right that in my example, distinct is not really the issue, since the records with the same keys do have different values. The issue is with reduceByKey, which is NOT reducing my RDDs correctly and resulting in 1 record per key. Does reduceByKey not support (String, String) keys? > reduceByKey not returning distinct results > -- > > Key: SPARK-10493 > URL: https://issues.apache.org/jira/browse/SPARK-10493 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: Glenn Strycker > Attachments: reduceByKey_example_001.scala > > > I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs > (using zipPartitions), partitioning by a hash partitioner, and then applying > a reduceByKey to summarize statistics by key. > Since my set before the reduceByKey consists of records such as (K, V1), (K, > V2), (K, V3), I expect the results after reduceByKey to be just (K, > f(V1,V2,V3)), where the function f is appropriately associative, commutative, > etc. Therefore, the results after reduceByKey ought to be distinct, correct? > I am running counts of my RDD and finding that adding an additional > .distinct after my .reduceByKey is changing the final count!! > Here is some example code: > rdd3 = tempRDD1. >zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2). >partitionBy(new HashPartitioner(numPartitions)). >reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, > math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5))) > println(rdd3.count) > rdd4 = rdd3.distinct > println(rdd4.count) > I am using persistence, checkpointing, and other stuff in my actual code that > I did not paste here, so I can paste my actual code if it would be helpful. > This issue may be related to SPARK-2620, except I am not using case classes, > to my knowledge. > See also > http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10493) reduceByKey not returning distinct results
[ https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14735626#comment-14735626 ] Glenn Strycker commented on SPARK-10493: Thanks for the speedy follow-up, [~frosner]! I'm attempting now as we speak to replicate this issue in Spark Shell so that I can find a nice minimal example and rule out issues specific to my application. I'll update this ticket when I have a better idea of what's happening. For now, I'm submitting via spark-submit to YARN using cluster mode. We have a specific set up at our company, so my actual submit script includes custom configs and such. Here's a bit of what I submit, so please let me know if you need a particular environment variable. Adding [~aagottlieb] and [~peterwoj] as watchers to this ticket so they can comment with appropriate additional details. /opt/spark/bin/spark-submit --conf spark.shuffle.service.enabled=true --properties-file spark.conf --files log4j.properties --jars --class myProgram --num-executors 428 --driver-memory 25G --executor-memory 20G target/scala-2.10/indid3p0_2.10-1.0.jar > reduceByKey not returning distinct results > -- > > Key: SPARK-10493 > URL: https://issues.apache.org/jira/browse/SPARK-10493 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: Glenn Strycker > > I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs > (using zipPartitions), partitioning by a hash partitioner, and then applying > a reduceByKey to summarize statistics by key. > Since my set before the reduceByKey consists of records such as (K, V1), (K, > V2), (K, V3), I expect the results after reduceByKey to be just (K, > f(V1,V2,V3)), where the function f is appropriately associative, commutative, > etc. Therefore, the results after reduceByKey ought to be distinct, correct? > I am running counts of my RDD and finding that adding an additional > .distinct after my .reduceByKey is changing the final count!! > Here is some example code: > rdd3 = tempRDD1. >zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2). >partitionBy(new HashPartitioner(numPartitions)). >reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, > math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5))) > println(rdd3.count) > rdd4 = rdd3.distinct > println(rdd4.count) > I am using persistence, checkpointing, and other stuff in my actual code that > I did not paste here, so I can paste my actual code if it would be helpful. > This issue may be related to SPARK-2620, except I am not using case classes, > to my knowledge. > See also > http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10493) reduceByKey not returning distinct results
[ https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14735653#comment-14735653 ] Glenn Strycker commented on SPARK-10493: Note: this only seems to be occurring "at scale" so far. I haven't noticed this issue for any of my unit tests, which are on the order of many 100 records, but the issue I'm currently seeing is on an RDD with approximately 1B records. As you can see in my submit command above, I'm running on 428 executors, and inside of my spark program I am requesting 2568 partitions, which is a 6x multiple, so each partition should have about 400,000 records if partitioning by the hash of the key has low enough skew. I'm wondering if my particular "reduceByKey.distinct" counts issue this has anything to do with zipPartitions or partitionBy, which are occurring before the reduceByKey. I tried splitting these steps into a separate RDD that materializes before the reduceByKey is run, but my counts for every step are the same :-/ > reduceByKey not returning distinct results > -- > > Key: SPARK-10493 > URL: https://issues.apache.org/jira/browse/SPARK-10493 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: Glenn Strycker > > I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs > (using zipPartitions), partitioning by a hash partitioner, and then applying > a reduceByKey to summarize statistics by key. > Since my set before the reduceByKey consists of records such as (K, V1), (K, > V2), (K, V3), I expect the results after reduceByKey to be just (K, > f(V1,V2,V3)), where the function f is appropriately associative, commutative, > etc. Therefore, the results after reduceByKey ought to be distinct, correct? > I am running counts of my RDD and finding that adding an additional > .distinct after my .reduceByKey is changing the final count!! > Here is some example code: > rdd3 = tempRDD1. >zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2). >partitionBy(new HashPartitioner(numPartitions)). >reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, > math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5))) > println(rdd3.count) > rdd4 = rdd3.distinct > println(rdd4.count) > I am using persistence, checkpointing, and other stuff in my actual code that > I did not paste here, so I can paste my actual code if it would be helpful. > This issue may be related to SPARK-2620, except I am not using case classes, > to my knowledge. > See also > http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10493) reduceByKey not returning distinct results
[ https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14735598#comment-14735598 ] Frank Rosner commented on SPARK-10493: -- Thanks for submitting the issue, [~glenn.strycker] :) Can you provide a minimal example so we can try to reproduce the issue? It should also contain the submit command (or are you using the shell)? > reduceByKey not returning distinct results > -- > > Key: SPARK-10493 > URL: https://issues.apache.org/jira/browse/SPARK-10493 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: Glenn Strycker > > I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs > (using zipPartitions), partitioning by a hash partitioner, and then applying > a reduceByKey to summarize statistics by key. > Since my set before the reduceByKey consists of records such as (K, V1), (K, > V2), (K, V3), I expect the results after reduceByKey to be just (K, > f(V1,V2,V3)), where the function f is appropriately associative, commutative, > etc. Therefore, the results after reduceByKey ought to be distinct, correct? > I am running counts of my RDD and finding that adding an additional > .distinct after my .reduceByKey is changing the final count!! > Here is some example code: > rdd3 = tempRDD1. >zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2). >partitionBy(new HashPartitioner(numPartitions)). >reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, > math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5))) > println(rdd3.count) > rdd4 = rdd3.distinct > println(rdd4.count) > I am using persistence, checkpointing, and other stuff in my actual code that > I did not paste here, so I can paste my actual code if it would be helpful. > This issue may be related to SPARK-2620, except I am not using case classes, > to my knowledge. > See also > http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10493) reduceByKey not returning distinct results
[ https://issues.apache.org/jira/browse/SPARK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14735772#comment-14735772 ] Sean Owen commented on SPARK-10493: --- There are some key pieces of info missing, like what the key and value types are. This could happen if they do not implement equals/hashCode correctly. It could also happen if your input is nondeterministic. Without a reproducible example with these details I don't think this is an actionable JIRA. > reduceByKey not returning distinct results > -- > > Key: SPARK-10493 > URL: https://issues.apache.org/jira/browse/SPARK-10493 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: Glenn Strycker > > I am running Spark 1.3.0 and creating an RDD by unioning several earlier RDDs > (using zipPartitions), partitioning by a hash partitioner, and then applying > a reduceByKey to summarize statistics by key. > Since my set before the reduceByKey consists of records such as (K, V1), (K, > V2), (K, V3), I expect the results after reduceByKey to be just (K, > f(V1,V2,V3)), where the function f is appropriately associative, commutative, > etc. Therefore, the results after reduceByKey ought to be distinct, correct? > I am running counts of my RDD and finding that adding an additional > .distinct after my .reduceByKey is changing the final count!! > Here is some example code: > rdd3 = tempRDD1. >zipPartitions(tempRDD2, true)((iter, iter2) => iter++iter2). >partitionBy(new HashPartitioner(numPartitions)). >reduceByKey((a,b) => (math.Ordering.String.min(a._1, b._1), a._2 + b._2, > math.max(a._3, b._3), math.max(a._4, b._4), math.max(a._5, b._5))) > println(rdd3.count) > rdd4 = rdd3.distinct > println(rdd4.count) > I am using persistence, checkpointing, and other stuff in my actual code that > I did not paste here, so I can paste my actual code if it would be helpful. > This issue may be related to SPARK-2620, except I am not using case classes, > to my knowledge. > See also > http://stackoverflow.com/questions/32466176/apache-spark-rdd-reducebykey-operation-not-returning-correct-distinct-results -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org