hi, Why check dependency.aggregator but not dependency.mapSideCombine in canUseSerializedShuffle? In BaseShuffle' SortShuffleWriter, dep.mapSideCombine decides dep.aggregator is passed to sorter or not. *canUseSerializedShuffle* /** * Helper method for determining whether a shuffle should use an optimized serialized shuffle * path or whether it should fall back to the original path that operates on deserialized objects. */ def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = { val shufId = dependency.shuffleId val numPartitions = dependency.partitioner.numPartitions if (!dependency.serializer.supportsRelocationOfSerializedObjects) { log.debug(s"Can't use serialized shuffle for shuffle $shufId because the serializer, " + s"${dependency.serializer.getClass.getName}, does not support object relocation") false } else if *(dependency.aggregator.isDefined*) { log.debug( s"Can't use serialized shuffle for shuffle $shufId because an aggregator is defined") false } else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) { log.debug(s"Can't use serialized shuffle for shuffle $shufId because it has more than " + s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions") false } else { log.debug(s"Can use serialized shuffle for shuffle $shufId") true } } } *SortShuffleWriter* private[spark] class SortShuffleWriter[K, V, C]( ... /** Write a bunch of records to this task's output */ override def write(records: Iterator[Product2[K, V]]): Unit = { sorter = if *(dep.mapSideCombine*) { require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") new ExternalSorter[K, V, C]( context,* dep.aggregato*r, Some(dep.partitioner), dep.keyOrdering, dep.serializer) } else { // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't // care whether the keys get sorted in each partition; that will be done on the reduce side // if the operation being run is sortByKey. new ExternalSorter[K, V, V]( context, *aggregator = None*, Some(dep.partitioner), ordering = None, dep.serializer) } sorter.insertAll(records)
-- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org