This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new e5728e2fb70 [SPARK-44999][CORE] Refactor `ExternalSorter` to reduce checks on `shouldPartition` when calling `getPartition` e5728e2fb70 is described below commit e5728e2fb706b0b611b371bdb7216acbdfe5c49b Author: yangjie01 <yangji...@baidu.com> AuthorDate: Tue Aug 29 08:30:06 2023 -0700 [SPARK-44999][CORE] Refactor `ExternalSorter` to reduce checks on `shouldPartition` when calling `getPartition` ### What changes were proposed in this pull request? The `getPartition` method checks `shouldPartition` every time it is called. However, `shouldPartition` should not be changeable after the `ExternalSorter` is instantiated, so this PR makes the following changes to `getPartition` to avoid always checking `shouldPartition`: 1. Added `val actualPartitioner`: when `shouldPartition` is true, it uses `partitioner.get`, otherwise it returns `ConstantPartitioner`, where `ConstantPartitioner` is defined as follows: https://github.com/apache/spark/blob/df63adf734370f5c2d71a348f9d36658718b302c/core/src/main/scala/org/apache/spark/Partitioner.scala#L156-L162 2. After step 1, the private method `getPartition` can directly call `actualPartitioner.getPartition`. In order to shorten the call stack, this PR replaces the call to `getPartition` in `ExternalSorter` with a call to `actualPartitioner.getPartition`. 3. Checked `numPartitions > 1` directly when initializing `val actualPartitioner` and removed `val shouldPartition`, because it is no longer used elsewhere. ### Why are the changes needed? To reduce checks on `shouldPartition` when calling `getPartition` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions ### Was this patch authored or co-authored using generative AI tooling? No Closes #42713 from LuciferYang/ExternalSorter-partitioner. Authored-by: yangjie01 <yangji...@baidu.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../org/apache/spark/util/collection/ExternalSorter.scala | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 7153bb72476..a68e0de70c5 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -102,10 +102,8 @@ private[spark] class ExternalSorter[K, V, C]( private val conf = SparkEnv.get.conf private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) - private val shouldPartition = numPartitions > 1 - private def getPartition(key: K): Int = { - if (shouldPartition) partitioner.get.getPartition(key) else 0 - } + private val actualPartitioner = + if (numPartitions > 1) partitioner.get else new ConstantPartitioner private val blockManager = SparkEnv.get.blockManager private val diskBlockManager = blockManager.diskBlockManager @@ -197,7 +195,7 @@ private[spark] class ExternalSorter[K, V, C]( while (records.hasNext) { addElementsRead() kv = records.next() - map.changeValue((getPartition(kv._1), kv._1), update) + map.changeValue((actualPartitioner.getPartition(kv._1), kv._1), update) maybeSpillCollection(usingMap = true) } } else { @@ -205,7 +203,7 @@ private[spark] class ExternalSorter[K, V, C]( while (records.hasNext) { addElementsRead() val kv = records.next() - buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C]) + buffer.insert(actualPartitioner.getPartition(kv._1), kv._1, kv._2.asInstanceOf[C]) maybeSpillCollection(usingMap = false) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org