This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 317a87ba9e90 [SPARK-53797][SQL] Fix 
`FileStreamSource.takeFilesUntilMax` to use `zipWithIndex` to avoid `indices` 
usage
317a87ba9e90 is described below

commit 317a87ba9e90a2c0f77aa8a3700d122a7c1f2b95
Author: Adam Binford <[email protected]>
AuthorDate: Fri Nov 21 13:47:53 2025 -0800

    [SPARK-53797][SQL] Fix `FileStreamSource.takeFilesUntilMax` to use 
`zipWithIndex` to avoid `indices` usage
    
    ### What changes were proposed in this pull request?
    
    Fixes a performance issue with the new `maxBytesPerTrigger` option for file 
stream sources introduced in https://github.com/apache/spark/pull/44636. This 
changes the iteration of files for calculating offsets when 
`maxBytesPerTrigger` is used with file stream sources from list indexing to 
iteration.
    
    ### Why are the changes needed?
    
    We tried out this new option and found streams reading for tables with a 
lot of files (in the millions) were spending hours trying to construct batches. 
After looking at the thread dump for these, I could see a Scala immutable.List 
was what the `files` object was stored as, which is a linked list under the 
hood with `O(n)` lookup time by index, making the `takesFilesUntilMax` method a 
`O(n^2)` operation. Instead the list should simply be iterated over which makes 
it a simple `O(n)` op [...]
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, just performance.
    
    ### How was this patch tested?
    
    No new tests, purely a performance improvement.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #52515 from Kimahriman/max-bytes-per-trigger-iter.
    
    Authored-by: Adam Binford <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit a870ea459712ad5e8d887e6ca8b7bb16b79d0130)
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../org/apache/spark/sql/execution/streaming/FileStreamSource.scala    | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index f0debce44e37..a13b47ce0f87 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -147,8 +147,7 @@ class FileStreamSource(
     var rSize = BigInt(0)
     val lFiles = ArrayBuffer[NewFileEntry]()
     val rFiles = ArrayBuffer[NewFileEntry]()
-    for (i <- files.indices) {
-      val file = files(i)
+    files.zipWithIndex.foreach { case (file, i) =>
       val newSize = lSize + file.size
       if (i == 0 || rFiles.isEmpty && newSize <= Long.MaxValue && newSize <= 
maxSize) {
         lSize += file.size


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to