[jira] [Commented] (SPARK-3098) In some cases, operation zipWithIndex get a wrong results
[ https://issues.apache.org/jira/browse/SPARK-3098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14118397#comment-14118397 ] Guoqiang Li commented on SPARK-3098: The following RDD operation seem to have this problem: {{zip}},{{zipWithIndex}},{{zipWithUniqueId}},{{zipPartitions}},{{groupBy}},{{groupByKey}}. In some cases, operation zipWithIndex get a wrong results -- Key: SPARK-3098 URL: https://issues.apache.org/jira/browse/SPARK-3098 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.1 Reporter: Guoqiang Li Priority: Critical The reproduce code: {code} val c = sc.parallelize(1 to 7899).flatMap { i = (1 to 1).toSeq.map(p = i * 6000 + p) }.distinct().zipWithIndex() c.join(c).filter(t = t._2._1 != t._2._2).take(3) {code} = {code} Array[(Int, (Long, Long))] = Array((1732608,(11,12)), (45515264,(12,13)), (36579712,(13,14))) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3098) In some cases, operation zipWithIndex get a wrong results
[ https://issues.apache.org/jira/browse/SPARK-3098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14118913#comment-14118913 ] Matei Zaharia commented on SPARK-3098: -- Yup, let's maybe document this for now. I'll create a JIRA for it. In some cases, operation zipWithIndex get a wrong results -- Key: SPARK-3098 URL: https://issues.apache.org/jira/browse/SPARK-3098 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.1 Reporter: Guoqiang Li Priority: Critical The reproduce code: {code} val c = sc.parallelize(1 to 7899).flatMap { i = (1 to 1).toSeq.map(p = i * 6000 + p) }.distinct().zipWithIndex() c.join(c).filter(t = t._2._1 != t._2._2).take(3) {code} = {code} Array[(Int, (Long, Long))] = Array((1732608,(11,12)), (45515264,(12,13)), (36579712,(13,14))) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3098) In some cases, operation zipWithIndex get a wrong results
[ https://issues.apache.org/jira/browse/SPARK-3098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14118918#comment-14118918 ] Matei Zaharia commented on SPARK-3098: -- Created SPARK-3356 to track this. In some cases, operation zipWithIndex get a wrong results -- Key: SPARK-3098 URL: https://issues.apache.org/jira/browse/SPARK-3098 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.1 Reporter: Guoqiang Li Priority: Critical The reproduce code: {code} val c = sc.parallelize(1 to 7899).flatMap { i = (1 to 1).toSeq.map(p = i * 6000 + p) }.distinct().zipWithIndex() c.join(c).filter(t = t._2._1 != t._2._2).take(3) {code} = {code} Array[(Int, (Long, Long))] = Array((1732608,(11,12)), (45515264,(12,13)), (36579712,(13,14))) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3098) In some cases, operation zipWithIndex get a wrong results
[ https://issues.apache.org/jira/browse/SPARK-3098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14117558#comment-14117558 ] Ye Xianjin commented on SPARK-3098: --- hi, [~srowen] and [~gq], I think what [~matei] wants to say is that because the ordering of elements in distinct() is not guaranteed, the result of zipWithIndex is not deterministic. If you recompute the RDD with distinct transformation, you are not guaranteed to get the same result. That explains the behavior here. But as [~srowen] said, It's surprised to see different results from the same RDD. [~matei], what do you think about this behavior? In some cases, operation zipWithIndex get a wrong results -- Key: SPARK-3098 URL: https://issues.apache.org/jira/browse/SPARK-3098 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.1 Reporter: Guoqiang Li Priority: Critical The reproduce code: {code} val c = sc.parallelize(1 to 7899).flatMap { i = (1 to 1).toSeq.map(p = i * 6000 + p) }.distinct().zipWithIndex() c.join(c).filter(t = t._2._1 != t._2._2).take(3) {code} = {code} Array[(Int, (Long, Long))] = Array((1732608,(11,12)), (45515264,(12,13)), (36579712,(13,14))) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3098) In some cases, operation zipWithIndex get a wrong results
[ https://issues.apache.org/jira/browse/SPARK-3098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14117622#comment-14117622 ] Matei Zaharia commented on SPARK-3098: -- It's true that the ordering of values after a shuffle is nondeterministic, so that for example on failure you might get a different order of keys in a reduceByKey or distinct or operations like that. However, I think that's the way it should be (and we can document it). RDDs are deterministic when viewed as a multiset, but not when viewed as an ordered collection, unless you do sortByKey. Operations like zipWithIndex are meant to be more of a convenience to get unique IDs or act on something with a known ordering (such as a text file where you want to know the line numbers). But the freedom to control fetch ordering is quite important for performance, especially if you want to have a push-based shuffle in the future. If we wanted to get the same result every time, we could design reduce tasks to tell the master the order they fetched stuff in after the first time they ran, but even then, notice that it might limit the kind of shuffle mechanisms we allow (e.g. it would be harder to make a push-based shuffle deterministic). I'd rather not make that guarantee now. In some cases, operation zipWithIndex get a wrong results -- Key: SPARK-3098 URL: https://issues.apache.org/jira/browse/SPARK-3098 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.1 Reporter: Guoqiang Li Priority: Critical The reproduce code: {code} val c = sc.parallelize(1 to 7899).flatMap { i = (1 to 1).toSeq.map(p = i * 6000 + p) }.distinct().zipWithIndex() c.join(c).filter(t = t._2._1 != t._2._2).take(3) {code} = {code} Array[(Int, (Long, Long))] = Array((1732608,(11,12)), (45515264,(12,13)), (36579712,(13,14))) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3098) In some cases, operation zipWithIndex get a wrong results
[ https://issues.apache.org/jira/browse/SPARK-3098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14110463#comment-14110463 ] Sean Owen commented on SPARK-3098: -- [~matei] The question isn't whether distinct returns a particular ordering, or whether zipWithIndex assigns particular indices, but whether they would result in the same ordering and same assignments every time the RDD is evaluated: {code} val c = {...}.distinct().zipWithIndex() c.join(c).filter(t = t._2._1 != t._2._2) {code} If so, then the same values should map to the same indices, and the self-join of c to itself should always pair the same value with itself. Regardless of what those un-guaranteed values are they should be the same since it's the very same RDD. If not, obviously that explains the behavior then. The behavior at first glance had also surprised me, since I had taken RDDs to be deterministic and transparently recomputable on demand. That is the important first question -- is that supposed to be so or not? In some cases, operation zipWithIndex get a wrong results -- Key: SPARK-3098 URL: https://issues.apache.org/jira/browse/SPARK-3098 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.1 Reporter: Guoqiang Li Priority: Critical The reproduce code: {code} val c = sc.parallelize(1 to 7899).flatMap { i = (1 to 1).toSeq.map(p = i * 6000 + p) }.distinct().zipWithIndex() c.join(c).filter(t = t._2._1 != t._2._2).take(3) {code} = {code} Array[(Int, (Long, Long))] = Array((1732608,(11,12)), (45515264,(12,13)), (36579712,(13,14))) {code} -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3098) In some cases, operation zipWithIndex get a wrong results
[ https://issues.apache.org/jira/browse/SPARK-3098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14110183#comment-14110183 ] Matei Zaharia commented on SPARK-3098: -- Sorry, I don't understand -- what exactly is the bug here? There's no guarantee about the ordering of elements in distinct(). If you're relying on zipWithIndex creating specific values, that's a wrong assumption to make. The question is just whether the *set* of elements returned by zipWithIndex is correct. I don't think we should change our randomize() to be more deterministic here just because you want zipWithIndex. We have to allow shuffle fetches to occur in a random order, or else we can get inefficiency when there are hotspots. If you'd like to make sure values land in specific partitions and in a specific order in each partition, you can partition the data with your own Partitioner, and run a mapPartitions that sorts them within each one. In some cases, operation zipWithIndex get a wrong results -- Key: SPARK-3098 URL: https://issues.apache.org/jira/browse/SPARK-3098 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.1 Reporter: Guoqiang Li Priority: Critical The reproduce code: {code} val c = sc.parallelize(1 to 7899).flatMap { i = (1 to 1).toSeq.map(p = i * 6000 + p) }.distinct().zipWithIndex() c.join(c).filter(t = t._2._1 != t._2._2).take(3) {code} = {code} Array[(Int, (Long, Long))] = Array((1732608,(11,12)), (45515264,(12,13)), (36579712,(13,14))) {code} -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3098) In some cases, operation zipWithIndex get a wrong results
[ https://issues.apache.org/jira/browse/SPARK-3098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14102044#comment-14102044 ] Sean Owen commented on SPARK-3098: -- It would be helpful if you would explain what you are trying to reproduce here; this is just code, and there's not a continuous value here, for example. It looks like you're producing overlapping sequences of numbers like 1..1, 6001..16000, ... and then flattening and removing duplicates, to get the range 1..47404000. That's zipped with its index to get these as (n,n-1) pairs. Then you map the second element to a String, and do the same to a subset of the data, join them, and see if there are any mismatches, because there shouldn't be. All the keys are values are unique. But why would this demonstrate something about zipWithIndex more directly than a test of the RDD c? More importantly, I ran this locally and got an empty Array, as expected. In some cases, operation zipWithIndex get a wrong results -- Key: SPARK-3098 URL: https://issues.apache.org/jira/browse/SPARK-3098 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.1 Reporter: Guoqiang Li Priority: Critical I do not know how to reproduce the bug. This is the case. When I was in operating 10 billion data by groupByKey. the results error: {noformat} (4696501, 370568) (4696501, 376672) (4696501, 374880) . (4696502, 350264) (4696502, 358458) (4696502, 398502) .. {noformat} = {noformat} (4696501,ArrayBuffer(350264, 358458, 398502 )), (4696502,ArrayBuffer(376621, ..)) {noformat} code : {code} val dealOuts = clickPreferences(sc, dealOutPath, periodTime) val dealOrders = orderPreferences(sc, dealOrderPath, periodTime) val favorites = favoritePreferences(sc, favoritePath, periodTime) val allBehaviors = (dealOrders ++ favorites ++ dealOuts) val peferences= allBehaviors.groupByKey().map { ... } {code} spark-defaults.conf: {code} spark.default.parallelism280 {code} -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3098) In some cases, operation zipWithIndex get a wrong results
[ https://issues.apache.org/jira/browse/SPARK-3098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14102101#comment-14102101 ] Guoqiang Li commented on SPARK-3098: Seems to be {{zipWithUniqueId}} also has this issue . {code} val c = sc.parallelize(1 to 7899).flatMap { i = (1 to 1).toSeq.map(p = i * 6000 + p) }.distinct().zipWithUniqueId() c.join(c).filter(t = t._2._1 != t._2._2).take(3) {code} In some cases, operation zipWithIndex get a wrong results -- Key: SPARK-3098 URL: https://issues.apache.org/jira/browse/SPARK-3098 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.1 Reporter: Guoqiang Li Priority: Critical I do not know how to reproduce the bug. This is the case. When I was in operating 10 billion data by groupByKey. the results error: {noformat} (4696501, 370568) (4696501, 376672) (4696501, 374880) . (4696502, 350264) (4696502, 358458) (4696502, 398502) .. {noformat} = {noformat} (4696501,ArrayBuffer(350264, 358458, 398502 )), (4696502,ArrayBuffer(376621, ..)) {noformat} code : {code} val dealOuts = clickPreferences(sc, dealOutPath, periodTime) val dealOrders = orderPreferences(sc, dealOrderPath, periodTime) val favorites = favoritePreferences(sc, favoritePath, periodTime) val allBehaviors = (dealOrders ++ favorites ++ dealOuts) val peferences= allBehaviors.groupByKey().map { ... } {code} spark-defaults.conf: {code} spark.default.parallelism280 {code} -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3098) In some cases, operation zipWithIndex get a wrong results
[ https://issues.apache.org/jira/browse/SPARK-3098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14102145#comment-14102145 ] Sean Owen commented on SPARK-3098: -- Yes I get the same result with Spark 1.0.0 with patches, including the fix for SPARK-2043, in standalone mode: {code} Array[(Int, (Long, Long))] = Array((9272040,(13,14)), (9985320,(14,13)), (32797680,(24,26))) {code} If I change the code above so that the ranges are not overlapping to begin with, and remove distinct(), I don't see the issue. It also goes away if the RDD c is cached. I would assume distinct() is deterministic, even if it doesn't guarantee an ordering. Same with zipWithIndex(). Either those assumptions are wrong, or it could be an issue either place. A quick check says most keys are correct (no mismatch), and the mismatch is generally small. This makes me wonder if there's some kind of race condition in handing out numbers? I'll look at the code too. In some cases, operation zipWithIndex get a wrong results -- Key: SPARK-3098 URL: https://issues.apache.org/jira/browse/SPARK-3098 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.1 Reporter: Guoqiang Li Priority: Critical I do not know how to reproduce the bug. This is the case. When I was in operating 10 billion data by groupByKey. the results error: {noformat} (4696501, 370568) (4696501, 376672) (4696501, 374880) . (4696502, 350264) (4696502, 358458) (4696502, 398502) .. {noformat} = {noformat} (4696501,ArrayBuffer(350264, 358458, 398502 )), (4696502,ArrayBuffer(376621, ..)) {noformat} code : {code} val dealOuts = clickPreferences(sc, dealOutPath, periodTime) val dealOrders = orderPreferences(sc, dealOrderPath, periodTime) val favorites = favoritePreferences(sc, favoritePath, periodTime) val allBehaviors = (dealOrders ++ favorites ++ dealOuts) val peferences= allBehaviors.groupByKey().map { ... } {code} spark-defaults.conf: {code} spark.default.parallelism280 {code} -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3098) In some cases, operation zipWithIndex get a wrong results
[ https://issues.apache.org/jira/browse/SPARK-3098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14102187#comment-14102187 ] Guoqiang Li commented on SPARK-3098: [~srowen] the following code also has this issue. {code} val c = sc.parallelize(1 to 7899).flatMap { i = (1 to 1).toSeq.map(p = i * 6000 + p) }.distinct() c.zip(c).filter(t = t._1 != t._2).take(3) {code} {{distinct}} seems to be the problem. In some cases, operation zipWithIndex get a wrong results -- Key: SPARK-3098 URL: https://issues.apache.org/jira/browse/SPARK-3098 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.1 Reporter: Guoqiang Li Priority: Critical The reproduce code: {code} val c = sc.parallelize(1 to 7899).flatMap { i = (1 to 1).toSeq.map(p = i * 6000 + p) }.distinct().zipWithIndex() c.join(c).filter(t = t._2._1 != t._2._2).take(3) {code} = {code} Array[(Int, (Long, Long))] = Array((1732608,(11,12)), (45515264,(12,13)), (36579712,(13,14))) {code} -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3098) In some cases, operation zipWithIndex get a wrong results
[ https://issues.apache.org/jira/browse/SPARK-3098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14102279#comment-14102279 ] Guoqiang Li commented on SPARK-3098: this issue caused by the code: [BlockFetcherIterator.scala#L221|https://github.com/apache/spark/blob/v1.0.1/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala#L221] {noformat}fetchRequests ++= Utils.randomize(remoteRequests){noformat} =[ShuffledRDD.scala#L65|https://github.com/apache/spark/blob/v1.0.1/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala#L65]{noformat}SparkEnv.get.shuffleFetcher.fetch[P](shuffledId, split.index, context, ser){noformat} = [PairRDDFunctions.scala#L100|https://github.com/apache/spark/blob/v1.0.1/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L100] {noformat} val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner) .setSerializer(serializer) partitioned.mapPartitionsWithContext((context, iter) = { new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context)) }, preservesPartitioning = true) {noformat} = [PairRDDFunctions.scala#L163|https://github.com/apache/spark/blob/v1.0.1/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L163] {noformat} def reduceByKey(partitioner: Partitioner, func: (V, V) = V): RDD[(K, V)] = { combineByKey[V]((v: V) = v, func, func, partitioner) } {noformat} = [RDD.scala#L288|https://github.com/apache/spark/blob/v1.0.1/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L288] {noformat} def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = map(x = (x, null)).reduceByKey((x, y) = x, numPartitions).map(_._1) {noformat} In some cases, operation zipWithIndex get a wrong results -- Key: SPARK-3098 URL: https://issues.apache.org/jira/browse/SPARK-3098 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.1 Reporter: Guoqiang Li Priority: Critical The reproduce code: {code} val c = sc.parallelize(1 to 7899).flatMap { i = (1 to 1).toSeq.map(p = i * 6000 + p) }.distinct().zipWithIndex() c.join(c).filter(t = t._2._1 != t._2._2).take(3) {code} = {code} Array[(Int, (Long, Long))] = Array((1732608,(11,12)), (45515264,(12,13)), (36579712,(13,14))) {code} -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org