Github user sddyljsx commented on the issue: https://github.com/apache/spark/pull/21859 I read the source code again. The RangePartitioner[K, V] in ShuffleExchangeExec is an instance of RangePartitioner[InternalRow, Null]. RangePartitioner only sample K for getting the rangeBounds. So We can get the InternalRow when doing sample. After getting the RangePartitioner, the ShuffleExchangeExec will map the InternalRow to [partitionId, InternalRow] for shuffle (the RangePartitioner generates the partitionId). The shuffle won't use the RangePartitioner, it will use PartitionIdPassthrough instead. In other words, the ShuffleWriter won't know the RangePartitioner's existence. ``` val rddWithPartitionIds: RDD[Product2[Int, InternalRow]] = newRdd.mapPartitionsInternal { iter => val getPartitionKey = getPartitionKeyExtractor() val mutablePair = new MutablePair[Int, InternalRow]() iter.map { row => mutablePair.update(part.getPartition(getPartitionKey(row)), row) } } val dependency = new ShuffleDependency[Int, InternalRow, InternalRow]( rddWithPartitionIds, new PartitionIdPassthrough(part.numPartitions), serializer) private class PartitionIdPassthrough(override val numPartitions: Int) extends Partitioner { override def getPartition(key: Any): Int = key.asInstanceOf[Int] } ``` The optimization will parallelize the cached InternalRow to the newRdd instead of getting it again. But in other places, like rdd's sortByKey ``` def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) : RDD[(K, V)] = self.withScope { val part = new RangePartitioner(numPartitions, self, ascending) new ShuffledRDD[K, V, V](self, part) .setKeyOrdering(if (ascending) ordering else ordering.reverse) } // getDependencies function in ShuffledRDD override def getDependencies: Seq[Dependency[_]] = { val serializer = userSpecifiedSerializer.getOrElse { val serializerManager = SparkEnv.get.serializerManager if (mapSideCombine) { serializerManager.getSerializer(implicitly[ClassTag[K]], implicitly[ClassTag[C]]) } else { serializerManager.getSerializer(implicitly[ClassTag[K]], implicitly[ClassTag[V]]) } } List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine)) } ``` The rdd is [K, V], and the shuffle uses RangePartitioner directly. But we can only get K when doing sample. so we can't restore the rdd using the cache. They work in two different ways. So the optimization only works in Spark Sql's ShuffleExchangeExec by now. 'The ShuffleWriter should treat RangePartitioner specially and consume the sampled data in RangePartitioner instead of the input iterator.' This idea is good, maybe we can cache both the K and V when doing sample. I will have a try.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org