[jira] [Comment Edited] (SPARK-3098) In some cases, operation zipWithIndex get a wrong results
[ https://issues.apache.org/jira/browse/SPARK-3098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14103985#comment-14103985 ] Guoqiang Li edited comment on SPARK-3098 at 8/20/14 3:20 PM: - To solve this bug. we may have to re-implement {{BasicBlockFetcherIterator}} was (Author: gq): To solve this bug. Possible to re-implement {{BasicBlockFetcherIterator}} In some cases, operation zipWithIndex get a wrong results -- Key: SPARK-3098 URL: https://issues.apache.org/jira/browse/SPARK-3098 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.1 Reporter: Guoqiang Li Priority: Critical The reproduce code: {code} val c = sc.parallelize(1 to 7899).flatMap { i = (1 to 1).toSeq.map(p = i * 6000 + p) }.distinct().zipWithIndex() c.join(c).filter(t = t._2._1 != t._2._2).take(3) {code} = {code} Array[(Int, (Long, Long))] = Array((1732608,(11,12)), (45515264,(12,13)), (36579712,(13,14))) {code} -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-3098) In some cases, operation zipWithIndex get a wrong results
[ https://issues.apache.org/jira/browse/SPARK-3098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14103985#comment-14103985 ] Guoqiang Li edited comment on SPARK-3098 at 8/20/14 3:21 PM: - To solve this bug. we may have to re-implement {{BasicBlockFetcherIterator}} or {{ZippedWithIndexRDD}} was (Author: gq): To solve this bug. we may have to re-implement {{BasicBlockFetcherIterator}} In some cases, operation zipWithIndex get a wrong results -- Key: SPARK-3098 URL: https://issues.apache.org/jira/browse/SPARK-3098 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.1 Reporter: Guoqiang Li Priority: Critical The reproduce code: {code} val c = sc.parallelize(1 to 7899).flatMap { i = (1 to 1).toSeq.map(p = i * 6000 + p) }.distinct().zipWithIndex() c.join(c).filter(t = t._2._1 != t._2._2).take(3) {code} = {code} Array[(Int, (Long, Long))] = Array((1732608,(11,12)), (45515264,(12,13)), (36579712,(13,14))) {code} -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-3098) In some cases, operation zipWithIndex get a wrong results
[ https://issues.apache.org/jira/browse/SPARK-3098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14102024#comment-14102024 ] Guoqiang Li edited comment on SPARK-3098 at 8/19/14 8:55 AM: - the (id, value) pairs are generated there zipWithIndex. The reproduce code: {code} val c = sc.parallelize(1 to 7899).flatMap { i = (1 to 1).toSeq.map(p = i * 6000 + p) }.distinct().zipWithIndex() val e = c.map(t = (t._1, t._2.toString)) val d = c.filter(t = t._1 % 100 5) e.join(d).filter(t = t._2._1 != t._2._2.toString).take(3) {code} was (Author: gq): the (id, value) pairs are generated there zipWithIndex. Reproduce the code: {code} val c = sc.parallelize(1 to 7899).flatMap { i = (1 to 1).toSeq.map(p = i * 6000 + p) }.distinct().zipWithIndex() val e = c.map(t = (t._1, t._2.toString)) val d = c.filter(t = t._1 % 100 5) e.join(d).filter(t = t._2._1 != t._2._2.toString).take(3) {code} In some cases, operation zipWithIndex get a wrong results -- Key: SPARK-3098 URL: https://issues.apache.org/jira/browse/SPARK-3098 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.1 Reporter: Guoqiang Li Priority: Critical I do not know how to reproduce the bug. This is the case. When I was in operating 10 billion data by groupByKey. the results error: {noformat} (4696501, 370568) (4696501, 376672) (4696501, 374880) . (4696502, 350264) (4696502, 358458) (4696502, 398502) .. {noformat} = {noformat} (4696501,ArrayBuffer(350264, 358458, 398502 )), (4696502,ArrayBuffer(376621, ..)) {noformat} code : {code} val dealOuts = clickPreferences(sc, dealOutPath, periodTime) val dealOrders = orderPreferences(sc, dealOrderPath, periodTime) val favorites = favoritePreferences(sc, favoritePath, periodTime) val allBehaviors = (dealOrders ++ favorites ++ dealOuts) val peferences= allBehaviors.groupByKey().map { ... } {code} spark-defaults.conf: {code} spark.default.parallelism280 {code} -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-3098) In some cases, operation zipWithIndex get a wrong results
[ https://issues.apache.org/jira/browse/SPARK-3098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14102024#comment-14102024 ] Guoqiang Li edited comment on SPARK-3098 at 8/19/14 8:58 AM: - the (id, value) pairs are generated by zipWithIndex. The reproduce code: {code} val c = sc.parallelize(1 to 7899).flatMap { i = (1 to 1).toSeq.map(p = i * 6000 + p) }.distinct().zipWithIndex() val e = c.map(t = (t._1, t._2.toString)) val d = c.filter(t = t._1 % 100 5) e.join(d).filter(t = t._2._1 != t._2._2.toString).take(3) {code} was (Author: gq): the (id, value) pairs are generated there zipWithIndex. The reproduce code: {code} val c = sc.parallelize(1 to 7899).flatMap { i = (1 to 1).toSeq.map(p = i * 6000 + p) }.distinct().zipWithIndex() val e = c.map(t = (t._1, t._2.toString)) val d = c.filter(t = t._1 % 100 5) e.join(d).filter(t = t._2._1 != t._2._2.toString).take(3) {code} In some cases, operation zipWithIndex get a wrong results -- Key: SPARK-3098 URL: https://issues.apache.org/jira/browse/SPARK-3098 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.1 Reporter: Guoqiang Li Priority: Critical I do not know how to reproduce the bug. This is the case. When I was in operating 10 billion data by groupByKey. the results error: {noformat} (4696501, 370568) (4696501, 376672) (4696501, 374880) . (4696502, 350264) (4696502, 358458) (4696502, 398502) .. {noformat} = {noformat} (4696501,ArrayBuffer(350264, 358458, 398502 )), (4696502,ArrayBuffer(376621, ..)) {noformat} code : {code} val dealOuts = clickPreferences(sc, dealOutPath, periodTime) val dealOrders = orderPreferences(sc, dealOrderPath, periodTime) val favorites = favoritePreferences(sc, favoritePath, periodTime) val allBehaviors = (dealOrders ++ favorites ++ dealOuts) val peferences= allBehaviors.groupByKey().map { ... } {code} spark-defaults.conf: {code} spark.default.parallelism280 {code} -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-3098) In some cases, operation zipWithIndex get a wrong results
[ https://issues.apache.org/jira/browse/SPARK-3098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14102064#comment-14102064 ] Guoqiang Li edited comment on SPARK-3098 at 8/19/14 9:41 AM: - We cluster on yarn. You can try the following code in cluster mode {code} val c = sc.parallelize(1 to 7899).flatMap { i = (1 to 1).toSeq.map(p = i * 6000 + p) }.distinct().zipWithIndex() c.join(c).filter(t = t._2._1 != t._2._2).take(3) {code} = {code} Array[(Int, (Long, Long))] = Array((1732608,(11,12)), (45515264,(12,13)), (36579712,(13,14))) {code} was (Author: gq): We cluster on yarn. You can try the following code in cluster mode {code} val c = sc.parallelize(1 to 7899).flatMap { i = (1 to 1).toSeq.map(p = i * 6000 + p) }.distinct().zipWithIndex() val e = c.map(t = (t._1, t._2)) e.join(e).filter(t = t._2._1 != t._2._2).take(3) {code} = {code} Array[(Int, (Long, Long))] = Array((1732608,(11,12)), (45515264,(12,13)), (36579712,(13,14))) {code} In some cases, operation zipWithIndex get a wrong results -- Key: SPARK-3098 URL: https://issues.apache.org/jira/browse/SPARK-3098 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.1 Reporter: Guoqiang Li Priority: Critical I do not know how to reproduce the bug. This is the case. When I was in operating 10 billion data by groupByKey. the results error: {noformat} (4696501, 370568) (4696501, 376672) (4696501, 374880) . (4696502, 350264) (4696502, 358458) (4696502, 398502) .. {noformat} = {noformat} (4696501,ArrayBuffer(350264, 358458, 398502 )), (4696502,ArrayBuffer(376621, ..)) {noformat} code : {code} val dealOuts = clickPreferences(sc, dealOutPath, periodTime) val dealOrders = orderPreferences(sc, dealOrderPath, periodTime) val favorites = favoritePreferences(sc, favoritePath, periodTime) val allBehaviors = (dealOrders ++ favorites ++ dealOuts) val peferences= allBehaviors.groupByKey().map { ... } {code} spark-defaults.conf: {code} spark.default.parallelism280 {code} -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-3098) In some cases, operation zipWithIndex get a wrong results
[ https://issues.apache.org/jira/browse/SPARK-3098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14102279#comment-14102279 ] Guoqiang Li edited comment on SPARK-3098 at 8/19/14 3:02 PM: - this issue caused by the code: [BlockFetcherIterator.scala#L221|https://github.com/apache/spark/blob/v1.0.1/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala#L221] {noformat}fetchRequests ++= Utils.randomize(remoteRequests){noformat} =[ShuffledRDD.scala#L65|https://github.com/apache/spark/blob/v1.0.1/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala#L65]{noformat}SparkEnv.get.shuffleFetcher.fetch[P](shuffledId, split.index, context, ser){noformat} = [PairRDDFunctions.scala#L100|https://github.com/apache/spark/blob/v1.0.1/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L100] {noformat} val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner) .setSerializer(serializer) partitioned.mapPartitionsWithContext((context, iter) = { new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context)) }, preservesPartitioning = true) {noformat} = [PairRDDFunctions.scala#L163|https://github.com/apache/spark/blob/v1.0.1/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L163] {noformat} def reduceByKey(partitioner: Partitioner, func: (V, V) = V): RDD[(K, V)] = { combineByKey[V]((v: V) = v, func, func, partitioner) } {noformat} = [RDD.scala#L288|https://github.com/apache/spark/blob/v1.0.1/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L288] {noformat} def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = map(x = (x, null)).reduceByKey((x, y) = x, numPartitions).map(_._1) {noformat} was (Author: gq): this issue caused by the code: [BlockFetcherIterator.scala#L221|https://github.com/apache/spark/blob/v1.0.1/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala#L221] {noformat}fetchRequests ++= Utils.randomize(remoteRequests){noformat} =[ShuffledRDD.scala#L65|https://github.com/apache/spark/blob/v1.0.1/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala#L65]{noformat}SparkEnv.get.shuffleFetcher.fetch[P](shuffledId, split.index, context, ser){noformat} = [PairRDDFunctions.scala#L100|https://github.com/apache/spark/blob/v1.0.1/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L100] {noformat} val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner) .setSerializer(serializer) partitioned.mapPartitionsWithContext((context, iter) = { new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context)) }, preservesPartitioning = true) {noformat} = [PairRDDFunctions.scala#L163|https://github.com/apache/spark/blob/v1.0.1/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L163] {noformat} def reduceByKey(partitioner: Partitioner, func: (V, V) = V): RDD[(K, V)] = { combineByKey[V]((v: V) = v, func, func, partitioner) } {noformat} = [RDD.scala#L288|https://github.com/apache/spark/blob/v1.0.1/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L288] {noformat} def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = map(x = (x, null)).reduceByKey((x, y) = x, numPartitions).map(_._1) {noformat} In some cases, operation zipWithIndex get a wrong results -- Key: SPARK-3098 URL: https://issues.apache.org/jira/browse/SPARK-3098 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.0.1 Reporter: Guoqiang Li Priority: Critical The reproduce code: {code} val c = sc.parallelize(1 to 7899).flatMap { i = (1 to 1).toSeq.map(p = i * 6000 + p) }.distinct().zipWithIndex() c.join(c).filter(t = t._2._1 != t._2._2).take(3) {code} = {code} Array[(Int, (Long, Long))] = Array((1732608,(11,12)), (45515264,(12,13)), (36579712,(13,14))) {code} -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org