[GitHub] spark pull request #20372: Improved block merging logic for partitions
Github user ash211 commented on a diff in the pull request: https://github.com/apache/spark/pull/20372#discussion_r163464207 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -445,16 +445,25 @@ case class FileSourceScanExec( currentSize = 0 } -// Assign files to partitions using "Next Fit Decreasing" -splitFiles.foreach { file => - if (currentSize + file.length > maxSplitBytes) { +def addFile(file: PartitionedFile): Unit = { +currentFiles += file +currentSize += file.length + openCostInBytes +} + +var frontIndex = 0 +var backIndex = splitFiles.length - 1 + +while (frontIndex <= backIndex) { +while (frontIndex <= backIndex && currentSize + splitFiles(frontIndex).length <= maxSplitBytes) { +addFile(splitFiles(frontIndex)) +frontIndex += 1 +} +while (backIndex > frontIndex && currentSize + splitFiles(backIndex).length <= maxSplitBytes) { +addFile(splitFiles(backIndex)) +backIndex -= 1 +} closePartition() - } - // Add the given file to the current partition. - currentSize += file.length + openCostInBytes --- End diff -- saw you added a commit to handle the large non-splittable files case -- can you please add a test for that also? want to make sure this while loop never becomes an infinite loop! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20372: Improved block merging logic for partitions
Github user glentakahashi commented on a diff in the pull request: https://github.com/apache/spark/pull/20372#discussion_r163427261 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -445,16 +445,25 @@ case class FileSourceScanExec( currentSize = 0 } -// Assign files to partitions using "Next Fit Decreasing" -splitFiles.foreach { file => - if (currentSize + file.length > maxSplitBytes) { +def addFile(file: PartitionedFile): Unit = { +currentFiles += file +currentSize += file.length + openCostInBytes +} + +var frontIndex = 0 +var backIndex = splitFiles.length - 1 + +while (frontIndex <= backIndex) { +while (frontIndex <= backIndex && currentSize + splitFiles(frontIndex).length <= maxSplitBytes) { +addFile(splitFiles(frontIndex)) +frontIndex += 1 +} +while (backIndex > frontIndex && currentSize + splitFiles(backIndex).length <= maxSplitBytes) { +addFile(splitFiles(backIndex)) +backIndex -= 1 +} closePartition() - } - // Add the given file to the current partition. - currentSize += file.length + openCostInBytes --- End diff -- Oh, only if the file `isSplittable`, so yea, will fail for non-splittable files. Will update code --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20372: Improved block merging logic for partitions
Github user glentakahashi commented on a diff in the pull request: https://github.com/apache/spark/pull/20372#discussion_r163426554 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -445,16 +445,25 @@ case class FileSourceScanExec( currentSize = 0 } -// Assign files to partitions using "Next Fit Decreasing" -splitFiles.foreach { file => - if (currentSize + file.length > maxSplitBytes) { +def addFile(file: PartitionedFile): Unit = { +currentFiles += file +currentSize += file.length + openCostInBytes +} + +var frontIndex = 0 +var backIndex = splitFiles.length - 1 + +while (frontIndex <= backIndex) { +while (frontIndex <= backIndex && currentSize + splitFiles(frontIndex).length <= maxSplitBytes) { +addFile(splitFiles(frontIndex)) +frontIndex += 1 +} +while (backIndex > frontIndex && currentSize + splitFiles(backIndex).length <= maxSplitBytes) { +addFile(splitFiles(backIndex)) +backIndex -= 1 +} closePartition() - } - // Add the given file to the current partition. - currentSize += file.length + openCostInBytes --- End diff -- This test still passes https://github.com/glentakahashi/spark/blob/c575977a5952bf50b605be8079c9be1e30f3bd36/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala#L101, and the splitFiles should have been already split by `filesMaxPartitionBytes` I originally had similar functionality in, but I /think/ its redundant --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20372: Improved block merging logic for partitions
Github user ash211 commented on a diff in the pull request: https://github.com/apache/spark/pull/20372#discussion_r163419745 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala --- @@ -142,15 +142,16 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") { checkScan(table.select('c1)) { partitions => // Files should be laid out [(file1), (file2, file3), (file4, file5), (file6)] -assert(partitions.size == 4, "when checking partitions") -assert(partitions(0).files.size == 1, "when checking partition 1") +assert(partitions.size == 3, "when checking partitions") +assert(partitions(0).files.size == 2, "when checking partition 1") assert(partitions(1).files.size == 2, "when checking partition 2") assert(partitions(2).files.size == 2, "when checking partition 3") -assert(partitions(3).files.size == 1, "when checking partition 4") // First partition reads (file1) --- End diff -- comment is stale now --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20372: Improved block merging logic for partitions
Github user ash211 commented on a diff in the pull request: https://github.com/apache/spark/pull/20372#discussion_r163424784 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -445,16 +445,25 @@ case class FileSourceScanExec( currentSize = 0 } -// Assign files to partitions using "Next Fit Decreasing" -splitFiles.foreach { file => - if (currentSize + file.length > maxSplitBytes) { +def addFile(file: PartitionedFile): Unit = { +currentFiles += file +currentSize += file.length + openCostInBytes +} + +var frontIndex = 0 +var backIndex = splitFiles.length - 1 + +while (frontIndex <= backIndex) { +while (frontIndex <= backIndex && currentSize + splitFiles(frontIndex).length <= maxSplitBytes) { +addFile(splitFiles(frontIndex)) +frontIndex += 1 +} +while (backIndex > frontIndex && currentSize + splitFiles(backIndex).length <= maxSplitBytes) { +addFile(splitFiles(backIndex)) +backIndex -= 1 +} closePartition() - } - // Add the given file to the current partition. - currentSize += file.length + openCostInBytes --- End diff -- should add a test for this too --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20372: Improved block merging logic for partitions
Github user ash211 commented on a diff in the pull request: https://github.com/apache/spark/pull/20372#discussion_r163419675 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala --- @@ -142,15 +142,16 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") { checkScan(table.select('c1)) { partitions => // Files should be laid out [(file1), (file2, file3), (file4, file5), (file6)] --- End diff -- comment is stale now --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20372: Improved block merging logic for partitions
Github user ash211 commented on a diff in the pull request: https://github.com/apache/spark/pull/20372#discussion_r163424415 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -445,16 +445,25 @@ case class FileSourceScanExec( currentSize = 0 } -// Assign files to partitions using "Next Fit Decreasing" -splitFiles.foreach { file => - if (currentSize + file.length > maxSplitBytes) { +def addFile(file: PartitionedFile): Unit = { +currentFiles += file +currentSize += file.length + openCostInBytes +} + +var frontIndex = 0 +var backIndex = splitFiles.length - 1 + +while (frontIndex <= backIndex) { +while (frontIndex <= backIndex && currentSize + splitFiles(frontIndex).length <= maxSplitBytes) { +addFile(splitFiles(frontIndex)) +frontIndex += 1 +} +while (backIndex > frontIndex && currentSize + splitFiles(backIndex).length <= maxSplitBytes) { +addFile(splitFiles(backIndex)) +backIndex -= 1 +} closePartition() - } - // Add the given file to the current partition. - currentSize += file.length + openCostInBytes --- End diff -- this handles files that are larger than the maxSplitBytes, which I think isn't done in the new algorithm. Need to make sure those don't form an infinite loop --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20372: Improved block merging logic for partitions
GitHub user glentakahashi opened a pull request: https://github.com/apache/spark/pull/20372 Improved block merging logic for partitions ## What changes were proposed in this pull request? Change DataSourceScanExec so that when grouping blocks together into partitions, also checks the end of the sorted list of splits to more efficiently fill out partitions. ## How was this patch tested? Updated old test to reflect the new logic, which causes the # of partitions to drop from 4 -> 3 You can merge this pull request into a Git repository by running: $ git pull https://github.com/glentakahashi/spark feature/improved-block-merging Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20372.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20372 commit c575977a5952bf50b605be8079c9be1e30f3bd36 Author: Glen TakahashiDate: 2018-01-23T23:22:34Z Improved block merging logic for partitions --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org