[ https://issues.apache.org/jira/browse/SPARK-3098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14102279#comment-14102279 ]
Guoqiang Li commented on SPARK-3098: ------------------------------------ this issue caused by the code: [BlockFetcherIterator.scala#L221|https://github.com/apache/spark/blob/v1.0.1/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala#L221] {noformat}fetchRequests ++= Utils.randomize(remoteRequests){noformat} <=>[ShuffledRDD.scala#L65|https://github.com/apache/spark/blob/v1.0.1/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala#L65]{noformat}SparkEnv.get.shuffleFetcher.fetch[P](shuffledId, split.index, context, ser){noformat} <=> [PairRDDFunctions.scala#L100|https://github.com/apache/spark/blob/v1.0.1/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L100] {noformat} val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner) .setSerializer(serializer) partitioned.mapPartitionsWithContext((context, iter) => { new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context)) }, preservesPartitioning = true) {noformat} <=> [PairRDDFunctions.scala#L163|https://github.com/apache/spark/blob/v1.0.1/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L163] {noformat} def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = { combineByKey[V]((v: V) => v, func, func, partitioner) } {noformat} <=> [RDD.scala#L288|https://github.com/apache/spark/blob/v1.0.1/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L288] {noformat} def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) {noformat} > In some cases, operation zipWithIndex get a wrong results > ---------------------------------------------------------- > > Key: SPARK-3098 > URL: https://issues.apache.org/jira/browse/SPARK-3098 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 1.0.1 > Reporter: Guoqiang Li > Priority: Critical > > The reproduce code: > {code} > val c = sc.parallelize(1 to 7899).flatMap { i => > (1 to 10000).toSeq.map(p => i * 6000 + p) > }.distinct().zipWithIndex() > c.join(c).filter(t => t._2._1 != t._2._2).take(3) > {code} > => > {code} > Array[(Int, (Long, Long))] = Array((1732608,(11,12)), (45515264,(12,13)), > (36579712,(13,14))) > {code} -- This message was sent by Atlassian JIRA (v6.2#6252) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org