Github user sddyljsx commented on the issue:

    https://github.com/apache/spark/pull/21859
  
    I read the source code again. 
    The RangePartitioner[K, V] in ShuffleExchangeExec is an instance of 
RangePartitioner[InternalRow, Null]. RangePartitioner only sample K for getting 
the rangeBounds. So We can get the InternalRow when doing sample.
    After getting the RangePartitioner, the ShuffleExchangeExec will map the 
InternalRow to [partitionId, InternalRow] for shuffle (the RangePartitioner 
generates the partitionId).
    The shuffle won't use the RangePartitioner, it will use 
PartitionIdPassthrough instead.
    In other words, the ShuffleWriter won't know the RangePartitioner's 
existence.
    
    ```
    val rddWithPartitionIds: RDD[Product2[Int, InternalRow]] = 
newRdd.mapPartitionsInternal { iter =>
              val getPartitionKey = getPartitionKeyExtractor()
              val mutablePair = new MutablePair[Int, InternalRow]()
              iter.map { row => 
mutablePair.update(part.getPartition(getPartitionKey(row)), row) }
            }
    
     val dependency =
          new ShuffleDependency[Int, InternalRow, InternalRow](
            rddWithPartitionIds,
            new PartitionIdPassthrough(part.numPartitions),
            serializer)
    
    private class PartitionIdPassthrough(override val numPartitions: Int) 
extends Partitioner {
      override def getPartition(key: Any): Int = key.asInstanceOf[Int]
    }
    ```
    
    The optimization will parallelize the cached InternalRow to the newRdd 
instead of getting it again.
    
    But in other places, like rdd's sortByKey
    
    ```
    def sortByKey(ascending: Boolean = true, numPartitions: Int = 
self.partitions.length)
          : RDD[(K, V)] = self.withScope
      {
        val part = new RangePartitioner(numPartitions, self, ascending)
        new ShuffledRDD[K, V, V](self, part)
          .setKeyOrdering(if (ascending) ordering else ordering.reverse)
      }
    // getDependencies function in ShuffledRDD
    override def getDependencies: Seq[Dependency[_]] = {
        val serializer = userSpecifiedSerializer.getOrElse {
          val serializerManager = SparkEnv.get.serializerManager
          if (mapSideCombine) {
            serializerManager.getSerializer(implicitly[ClassTag[K]], 
implicitly[ClassTag[C]])
          } else {
            serializerManager.getSerializer(implicitly[ClassTag[K]], 
implicitly[ClassTag[V]])
          }
        }
        List(new ShuffleDependency(prev, part, serializer, keyOrdering, 
aggregator, mapSideCombine))
      }
    
    ```
    The rdd is [K, V], and the shuffle uses RangePartitioner directly.  But we 
can only get K when doing sample. so we can't restore the rdd using the cache.
    
    They work in two different ways.
    
    So the optimization only works in Spark Sql's ShuffleExchangeExec by now.
    
    'The ShuffleWriter should treat RangePartitioner specially and consume the 
sampled data in RangePartitioner instead of the input iterator.' This idea is 
good, maybe we can cache both the K and V when doing sample. I will have a try.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to