Github user ash211 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20372#discussion_r163464207
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
    @@ -445,16 +445,25 @@ case class FileSourceScanExec(
           currentSize = 0
         }
     
    -    // Assign files to partitions using "Next Fit Decreasing"
    -    splitFiles.foreach { file =>
    -      if (currentSize + file.length > maxSplitBytes) {
    +    def addFile(file: PartitionedFile): Unit = {
    +        currentFiles += file
    +        currentSize += file.length + openCostInBytes
    +    }
    +
    +    var frontIndex = 0
    +    var backIndex = splitFiles.length - 1
    +
    +    while (frontIndex <= backIndex) {
    +        while (frontIndex <= backIndex && currentSize + 
splitFiles(frontIndex).length <= maxSplitBytes) {
    +            addFile(splitFiles(frontIndex))
    +            frontIndex += 1
    +        }
    +        while (backIndex > frontIndex && currentSize + 
splitFiles(backIndex).length <= maxSplitBytes) {
    +            addFile(splitFiles(backIndex))
    +            backIndex -= 1
    +        }
             closePartition()
    -      }
    -      // Add the given file to the current partition.
    -      currentSize += file.length + openCostInBytes
    --- End diff --
    
    saw you added a commit to handle the large non-splittable files case -- can 
you please add a test for that also?
    
    want to make sure this while loop never becomes an infinite loop!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to