Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21101#discussion_r182609675
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 ---
    @@ -167,22 +164,24 @@ object ShuffleExchangeExec {
         val shuffleManager = SparkEnv.get.shuffleManager
         val sortBasedShuffleOn = 
shuffleManager.isInstanceOf[SortShuffleManager]
         val bypassMergeThreshold = 
conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
    +    val numParts = partitioner.numPartitions
         if (sortBasedShuffleOn) {
    -      val bypassIsSupported = 
SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
    -      if (bypassIsSupported && partitioner.numPartitions <= 
bypassMergeThreshold) {
    +      if (numParts <= bypassMergeThreshold) {
             // If we're using the original SortShuffleManager and the number 
of output partitions is
             // sufficiently small, then Spark will fall back to the hash-based 
shuffle write path, which
             // doesn't buffer deserialized records.
             // Note that we'll have to remove this case if we fix SPARK-6026 
and remove this bypass.
             false
    -      } else if (serializer.supportsRelocationOfSerializedObjects) {
    +      } else if (numParts <= 
SortShuffleManager.MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
    --- End diff --
    
    I was almost going to suggest that we should we check for both conditions 
with an `&&` here just as future-proofing in case `serializer` was changed, but 
I can now see why that isn't necessary: we always use an `UnsafeRowSerializer` 
here now. It was only in the pre-Tungsten era that we could use either 
`UnsafeRowSerializer` or `SparkSqlSerializer` here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to