This is an automated email from the ASF dual-hosted git repository.
philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 22d0d63998 [GLUTEN-11231][CORE] Improve `regeneratePartition` for
`PartitionsUtil` (#11232)
22d0d63998 is described below
commit 22d0d63998730375625c18dd413f6cf491ccd2bd
Author: Jiaan Geng <[email protected]>
AuthorDate: Wed Dec 3 13:19:20 2025 +0800
[GLUTEN-11231][CORE] Improve `regeneratePartition` for `PartitionsUtil`
(#11232)
---
.../org/apache/gluten/utils/PartitionsUtil.scala | 85 +++++++++++-----------
1 file changed, 41 insertions(+), 44 deletions(-)
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/utils/PartitionsUtil.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/utils/PartitionsUtil.scala
index 0f9c5deb23..b960482490 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/utils/PartitionsUtil.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/utils/PartitionsUtil.scala
@@ -158,60 +158,57 @@ object PartitionsUtil {
inputPartitions: Seq[FilePartition],
smallFileThreshold: Double): Seq[FilePartition] = {
- // Flatten and sort descending by file size.
- val filesSorted: Seq[(PartitionedFile, Long)] =
- inputPartitions
- .flatMap(_.files)
- .map(f => (f, f.length))
- .sortBy(_._2)(Ordering.Long.reverse)
-
val partitions =
Array.fill(inputPartitions.size)(mutable.ArrayBuffer.empty[PartitionedFile])
def addToBucket(
heap: mutable.PriorityQueue[(Long, Int, Int)],
file: PartitionedFile,
- sz: Long): Unit = {
- val (load, numFiles, idx) = heap.dequeue()
+ fileSize: Long): Unit = {
+ val (size, numFiles, idx) = heap.dequeue()
partitions(idx) += file
- heap.enqueue((load + sz, numFiles + 1, idx))
+ heap.enqueue((size + fileSize, numFiles + 1, idx))
}
- // First by load, then by numFiles.
- val heapByFileSize =
- mutable.PriorityQueue.empty[(Long, Int, Int)](
- Ordering
- .by[(Long, Int, Int), (Long, Int)] {
- case (load, numFiles, _) =>
- (load, numFiles)
- }
- .reverse
- )
+ def initializeHeap(
+ ordering: Ordering[(Long, Int, Int)]): mutable.PriorityQueue[(Long,
Int, Int)] = {
+ val heap = mutable.PriorityQueue.empty[(Long, Int, Int)](ordering)
+ inputPartitions.indices.foreach(i => heap.enqueue((0L, 0, i)))
+ heap
+ }
+
+ // Flatten and sort descending by file size.
+ val filesSorted: Seq[(PartitionedFile, Long)] =
+ inputPartitions
+ .flatMap(_.files)
+ .map(f => (f, f.length))
+ .sortBy(_._2)(Ordering.Long.reverse)
+
+ // First by size, then by number of files.
+ val sizeFirstOrdering = Ordering
+ .by[(Long, Int, Int), (Long, Int)] { case (size, numFiles, _) => (size,
numFiles) }
+ .reverse
if (smallFileThreshold > 0) {
val smallFileTotalSize = filesSorted.map(_._2).sum * smallFileThreshold
- // First by numFiles, then by load.
- val heapByFileNum =
- mutable.PriorityQueue.empty[(Long, Int, Int)](
- Ordering
- .by[(Long, Int, Int), (Int, Long)] {
- case (load, numFiles, _) =>
- (numFiles, load)
- }
- .reverse
- )
-
- inputPartitions.indices.foreach(i => heapByFileNum.enqueue((0L, 0, i)))
+ // First by number of files, then by size.
+ val numFirstOrdering = Ordering
+ .by[(Long, Int, Int), (Int, Long)] { case (size, numFiles, _) =>
(numFiles, size) }
+ .reverse
+ val heapByFileNum = initializeHeap(numFirstOrdering)
var numSmallFiles = 0
var smallFileSize = 0L
- // Enqueue small files to the least number of files and the least load.
- filesSorted.reverse.takeWhile(f => f._2 + smallFileSize <=
smallFileTotalSize).foreach {
- case (file, sz) =>
- addToBucket(heapByFileNum, file, sz)
- numSmallFiles += 1
- smallFileSize += sz
- }
+ // Distribute small files evenly across partitions to achieve load
balancing of small files.
+ filesSorted.reverseIterator
+ .takeWhile(f => f._2 + smallFileSize <= smallFileTotalSize)
+ .foreach {
+ case (file, fileSize) =>
+ addToBucket(heapByFileNum, file, fileSize)
+ numSmallFiles += 1
+ smallFileSize += fileSize
+ }
+ val heapByFileSize = mutable.PriorityQueue.empty[(Long, Int,
Int)](sizeFirstOrdering)
// Move buckets from heapByFileNum to heapByFileSize.
while (heapByFileNum.nonEmpty) {
heapByFileSize.enqueue(heapByFileNum.dequeue())
@@ -219,15 +216,15 @@ object PartitionsUtil {
// Finally, enqueue remaining files.
filesSorted.take(filesSorted.size - numSmallFiles).foreach {
- case (file, sz) =>
- addToBucket(heapByFileSize, file, sz)
+ case (file, fileSize) =>
+ addToBucket(heapByFileSize, file, fileSize)
}
} else {
- inputPartitions.indices.foreach(i => heapByFileSize.enqueue((0L, 0, i)))
+ val heapByFileSize = initializeHeap(sizeFirstOrdering)
filesSorted.foreach {
- case (file, sz) =>
- addToBucket(heapByFileSize, file, sz)
+ case (file, fileSize) =>
+ addToBucket(heapByFileSize, file, fileSize)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]