[ 
https://issues.apache.org/jira/browse/SPARK-3098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14102279#comment-14102279
 ] 

Guoqiang Li commented on SPARK-3098:
------------------------------------

this issue caused by the code: 
[BlockFetcherIterator.scala#L221|https://github.com/apache/spark/blob/v1.0.1/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala#L221]
 
{noformat}fetchRequests ++= Utils.randomize(remoteRequests){noformat}
 
<=>[ShuffledRDD.scala#L65|https://github.com/apache/spark/blob/v1.0.1/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala#L65]{noformat}SparkEnv.get.shuffleFetcher.fetch[P](shuffledId,
 split.index, context, ser){noformat}
<=>
[PairRDDFunctions.scala#L100|https://github.com/apache/spark/blob/v1.0.1/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L100]
{noformat}
 val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)
        .setSerializer(serializer)
      partitioned.mapPartitionsWithContext((context, iter) => {
        new InterruptibleIterator(context, 
aggregator.combineCombinersByKey(iter, context))
      }, preservesPartitioning = true)
{noformat}
<=>
[PairRDDFunctions.scala#L163|https://github.com/apache/spark/blob/v1.0.1/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L163]
{noformat}
  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
    combineByKey[V]((v: V) => v, func, func, partitioner)
  }
{noformat}
<=>

[RDD.scala#L288|https://github.com/apache/spark/blob/v1.0.1/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L288]

{noformat}
  def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] =
    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)

{noformat}

>  In some cases, operation zipWithIndex get a wrong results
> ----------------------------------------------------------
>
>                 Key: SPARK-3098
>                 URL: https://issues.apache.org/jira/browse/SPARK-3098
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.0.1
>            Reporter: Guoqiang Li
>            Priority: Critical
>
> The reproduce code:
> {code}
>      val c = sc.parallelize(1 to 7899).flatMap { i =>
>       (1 to 10000).toSeq.map(p => i * 6000 + p)
>     }.distinct().zipWithIndex() 
>     c.join(c).filter(t => t._2._1 != t._2._2).take(3)
> {code}
>  => 
> {code}
>  Array[(Int, (Long, Long))] = Array((1732608,(11,12)), (45515264,(12,13)), 
> (36579712,(13,14)))
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to