This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 0669bc8905df [SPARK-53797][SQL] Fix
`FileStreamSource.takeFilesUntilMax` to use `zipWithIndex` to avoid `indices`
usage
0669bc8905df is described below
commit 0669bc8905df69156b99dddc65d5f4469775423e
Author: Adam Binford <[email protected]>
AuthorDate: Fri Nov 21 13:47:53 2025 -0800
[SPARK-53797][SQL] Fix `FileStreamSource.takeFilesUntilMax` to use
`zipWithIndex` to avoid `indices` usage
### What changes were proposed in this pull request?
Fixes a performance issue with the new `maxBytesPerTrigger` option for file
stream sources introduced in https://github.com/apache/spark/pull/44636. This
changes the iteration of files for calculating offsets when
`maxBytesPerTrigger` is used with file stream sources from list indexing to
iteration.
### Why are the changes needed?
We tried out this new option and found streams reading for tables with a
lot of files (in the millions) were spending hours trying to construct batches.
After looking at the thread dump for these, I could see a Scala immutable.List
was what the `files` object was stored as, which is a linked list under the
hood with `O(n)` lookup time by index, making the `takesFilesUntilMax` method a
`O(n^2)` operation. Instead the list should simply be iterated over which makes
it a simple `O(n)` op [...]
### Does this PR introduce _any_ user-facing change?
No, just performance.
### How was this patch tested?
No new tests, purely a performance improvement.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #52515 from Kimahriman/max-bytes-per-trigger-iter.
Authored-by: Adam Binford <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit a870ea459712ad5e8d887e6ca8b7bb16b79d0130)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/sql/execution/streaming/runtime/FileStreamSource.scala | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/FileStreamSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/FileStreamSource.scala
index d5503f1c247d..9847bd9d7644 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/FileStreamSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/FileStreamSource.scala
@@ -149,8 +149,7 @@ class FileStreamSource(
var rSize = BigInt(0)
val lFiles = ArrayBuffer[NewFileEntry]()
val rFiles = ArrayBuffer[NewFileEntry]()
- for (i <- files.indices) {
- val file = files(i)
+ files.zipWithIndex.foreach { case (file, i) =>
val newSize = lSize + file.size
if (i == 0 || rFiles.isEmpty && newSize <= Long.MaxValue && newSize <=
maxSize) {
lSize += file.size
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]