[GitHub] spark pull request #20372: Improved block merging logic for partitions

2018-01-23 Thread ash211
Github user ash211 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20372#discussion_r163464207
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -445,16 +445,25 @@ case class FileSourceScanExec(
   currentSize = 0
 }
 
-// Assign files to partitions using "Next Fit Decreasing"
-splitFiles.foreach { file =>
-  if (currentSize + file.length > maxSplitBytes) {
+def addFile(file: PartitionedFile): Unit = {
+currentFiles += file
+currentSize += file.length + openCostInBytes
+}
+
+var frontIndex = 0
+var backIndex = splitFiles.length - 1
+
+while (frontIndex <= backIndex) {
+while (frontIndex <= backIndex && currentSize + 
splitFiles(frontIndex).length <= maxSplitBytes) {
+addFile(splitFiles(frontIndex))
+frontIndex += 1
+}
+while (backIndex > frontIndex && currentSize + 
splitFiles(backIndex).length <= maxSplitBytes) {
+addFile(splitFiles(backIndex))
+backIndex -= 1
+}
 closePartition()
-  }
-  // Add the given file to the current partition.
-  currentSize += file.length + openCostInBytes
--- End diff --

saw you added a commit to handle the large non-splittable files case -- can 
you please add a test for that also?

want to make sure this while loop never becomes an infinite loop!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20372: Improved block merging logic for partitions

2018-01-23 Thread glentakahashi
Github user glentakahashi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20372#discussion_r163427261
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -445,16 +445,25 @@ case class FileSourceScanExec(
   currentSize = 0
 }
 
-// Assign files to partitions using "Next Fit Decreasing"
-splitFiles.foreach { file =>
-  if (currentSize + file.length > maxSplitBytes) {
+def addFile(file: PartitionedFile): Unit = {
+currentFiles += file
+currentSize += file.length + openCostInBytes
+}
+
+var frontIndex = 0
+var backIndex = splitFiles.length - 1
+
+while (frontIndex <= backIndex) {
+while (frontIndex <= backIndex && currentSize + 
splitFiles(frontIndex).length <= maxSplitBytes) {
+addFile(splitFiles(frontIndex))
+frontIndex += 1
+}
+while (backIndex > frontIndex && currentSize + 
splitFiles(backIndex).length <= maxSplitBytes) {
+addFile(splitFiles(backIndex))
+backIndex -= 1
+}
 closePartition()
-  }
-  // Add the given file to the current partition.
-  currentSize += file.length + openCostInBytes
--- End diff --

Oh, only if the file `isSplittable`, so yea, will fail for non-splittable 
files. Will update code


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20372: Improved block merging logic for partitions

2018-01-23 Thread glentakahashi
Github user glentakahashi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20372#discussion_r163426554
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -445,16 +445,25 @@ case class FileSourceScanExec(
   currentSize = 0
 }
 
-// Assign files to partitions using "Next Fit Decreasing"
-splitFiles.foreach { file =>
-  if (currentSize + file.length > maxSplitBytes) {
+def addFile(file: PartitionedFile): Unit = {
+currentFiles += file
+currentSize += file.length + openCostInBytes
+}
+
+var frontIndex = 0
+var backIndex = splitFiles.length - 1
+
+while (frontIndex <= backIndex) {
+while (frontIndex <= backIndex && currentSize + 
splitFiles(frontIndex).length <= maxSplitBytes) {
+addFile(splitFiles(frontIndex))
+frontIndex += 1
+}
+while (backIndex > frontIndex && currentSize + 
splitFiles(backIndex).length <= maxSplitBytes) {
+addFile(splitFiles(backIndex))
+backIndex -= 1
+}
 closePartition()
-  }
-  // Add the given file to the current partition.
-  currentSize += file.length + openCostInBytes
--- End diff --

This test still passes 
https://github.com/glentakahashi/spark/blob/c575977a5952bf50b605be8079c9be1e30f3bd36/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala#L101,
 and the splitFiles should have been already split by `filesMaxPartitionBytes`

I originally had similar functionality in, but I /think/ its redundant


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20372: Improved block merging logic for partitions

2018-01-23 Thread ash211
Github user ash211 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20372#discussion_r163419745
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
 ---
@@ -142,15 +142,16 @@ class FileSourceStrategySuite extends QueryTest with 
SharedSQLContext with Predi
 SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") {
   checkScan(table.select('c1)) { partitions =>
 // Files should be laid out [(file1), (file2, file3), (file4, 
file5), (file6)]
-assert(partitions.size == 4, "when checking partitions")
-assert(partitions(0).files.size == 1, "when checking partition 1")
+assert(partitions.size == 3, "when checking partitions")
+assert(partitions(0).files.size == 2, "when checking partition 1")
 assert(partitions(1).files.size == 2, "when checking partition 2")
 assert(partitions(2).files.size == 2, "when checking partition 3")
-assert(partitions(3).files.size == 1, "when checking partition 4")
 
 // First partition reads (file1)
--- End diff --

comment is stale now


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20372: Improved block merging logic for partitions

2018-01-23 Thread ash211
Github user ash211 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20372#discussion_r163424784
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -445,16 +445,25 @@ case class FileSourceScanExec(
   currentSize = 0
 }
 
-// Assign files to partitions using "Next Fit Decreasing"
-splitFiles.foreach { file =>
-  if (currentSize + file.length > maxSplitBytes) {
+def addFile(file: PartitionedFile): Unit = {
+currentFiles += file
+currentSize += file.length + openCostInBytes
+}
+
+var frontIndex = 0
+var backIndex = splitFiles.length - 1
+
+while (frontIndex <= backIndex) {
+while (frontIndex <= backIndex && currentSize + 
splitFiles(frontIndex).length <= maxSplitBytes) {
+addFile(splitFiles(frontIndex))
+frontIndex += 1
+}
+while (backIndex > frontIndex && currentSize + 
splitFiles(backIndex).length <= maxSplitBytes) {
+addFile(splitFiles(backIndex))
+backIndex -= 1
+}
 closePartition()
-  }
-  // Add the given file to the current partition.
-  currentSize += file.length + openCostInBytes
--- End diff --

should add a test for this too


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20372: Improved block merging logic for partitions

2018-01-23 Thread ash211
Github user ash211 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20372#discussion_r163419675
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
 ---
@@ -142,15 +142,16 @@ class FileSourceStrategySuite extends QueryTest with 
SharedSQLContext with Predi
 SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") {
   checkScan(table.select('c1)) { partitions =>
 // Files should be laid out [(file1), (file2, file3), (file4, 
file5), (file6)]
--- End diff --

comment is stale now


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20372: Improved block merging logic for partitions

2018-01-23 Thread ash211
Github user ash211 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20372#discussion_r163424415
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -445,16 +445,25 @@ case class FileSourceScanExec(
   currentSize = 0
 }
 
-// Assign files to partitions using "Next Fit Decreasing"
-splitFiles.foreach { file =>
-  if (currentSize + file.length > maxSplitBytes) {
+def addFile(file: PartitionedFile): Unit = {
+currentFiles += file
+currentSize += file.length + openCostInBytes
+}
+
+var frontIndex = 0
+var backIndex = splitFiles.length - 1
+
+while (frontIndex <= backIndex) {
+while (frontIndex <= backIndex && currentSize + 
splitFiles(frontIndex).length <= maxSplitBytes) {
+addFile(splitFiles(frontIndex))
+frontIndex += 1
+}
+while (backIndex > frontIndex && currentSize + 
splitFiles(backIndex).length <= maxSplitBytes) {
+addFile(splitFiles(backIndex))
+backIndex -= 1
+}
 closePartition()
-  }
-  // Add the given file to the current partition.
-  currentSize += file.length + openCostInBytes
--- End diff --

this handles files that are larger than the maxSplitBytes, which I think 
isn't done in the new algorithm.  Need to make sure those don't form an 
infinite loop


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20372: Improved block merging logic for partitions

2018-01-23 Thread glentakahashi
GitHub user glentakahashi opened a pull request:

https://github.com/apache/spark/pull/20372

Improved block merging logic for partitions

## What changes were proposed in this pull request?

Change DataSourceScanExec so that when grouping blocks together into 
partitions, also checks the end of the sorted list of splits to more 
efficiently fill out partitions.

## How was this patch tested?

Updated old test to reflect the new logic, which causes the # of partitions 
to drop from 4 -> 3


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/glentakahashi/spark 
feature/improved-block-merging

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20372.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20372


commit c575977a5952bf50b605be8079c9be1e30f3bd36
Author: Glen Takahashi 
Date:   2018-01-23T23:22:34Z

Improved block merging logic for partitions




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org