This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f5faaa24e3a [SPARK-46641][SS] Add maxBytesPerTrigger threshold
3f5faaa24e3a is described below

commit 3f5faaa24e3ab4d9cc8f996bd1938573dd057e20
Author: maxim_konstantinov <maxim_konstanti...@apple.com>
AuthorDate: Thu Feb 8 23:16:17 2024 -0800

    [SPARK-46641][SS] Add maxBytesPerTrigger threshold
    
    ### What changes were proposed in this pull request?
    This PR adds [Input Streaming 
Source's](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#input-sources)
 option `maxBytesPerTrigger` for limiting the total size of input files in a 
streaming batch. Semantics of `maxBytesPerTrigger` is very close to already 
existing one `maxFilesPerTrigger` option.
    
    #### How a feature was implemented?
    Because `maxBytesPerTrigger` is semantically close to `maxFilesPerTrigger` 
I used all the `maxFilesPerTrigger` usages in the whole repository as a 
potential places that requires changes, that includes:
    - Option paramater definition
    - Option related logic
    - Option related ScalaDoc and MD files
    - Option related test
    
    I went over the usage of all usages of `maxFilesPerTrigger` in 
`FileStreamSourceSuite` and implemented `maxBytesPerTrigger` in the same 
fashion as those two are pretty close in their nature. From the structure and 
elements of ReadLimit I've concluded that current design implies only one 
simple rule for ReadLimit, so I openly prohibited the setting of both 
maxFilesPerTrigger and maxBytesPerTrigger at the same time.
    
    ### Why are the changes needed?
    This feature is useful for our and our sister teams and we expect it will 
find a broad acceptance among Spark users. We have a use-case in a few of the 
Spark pipelines we support when we use Available-now trigger for periodic 
processing using Spark Streaming. We use `maxFilesPerTrigger` threshold for 
now, but this is not ideal as Input file size might change with the time which 
requires periodic configuration adjustment of `maxFilesPerTrigger`. 
Computational complexity of the job depe [...]
    
    ### Does this PR introduce _any_ user-facing change?
    Yes
    
    ### How was this patch tested?
    New unit tests were added or existing `maxFilesPerTrigger` test were 
extended. I searched `maxFilesPerTrigger` related test  and added new tests or 
extended existing ones trying to minimize and simplify the changes.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #44636 from MaxNevermind/streaming-add-maxBytesPerTrigger-option.
    
    Lead-authored-by: maxim_konstantinov <maxim_konstanti...@apple.com>
    Co-authored-by: Max Konstantinov <konstantinov.ma...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../spark/sql/streaming/DataStreamReader.scala     |  24 +++-
 docs/structured-streaming-programming-guide.md     |   8 +-
 .../sql/connector/read/streaming/ReadLimit.java    |   2 +
 .../{ReadLimit.java => ReadMaxBytes.java}          |  39 +++---
 .../execution/streaming/FileStreamOptions.scala    |  18 ++-
 .../sql/execution/streaming/FileStreamSource.scala |  87 +++++++++++---
 .../spark/sql/streaming/DataStreamReader.scala     |  12 ++
 .../sql/streaming/FileStreamSourceSuite.scala      | 133 +++++++++++++++------
 8 files changed, 247 insertions(+), 76 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index bc8e30cd300c..789425c9daea 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -159,7 +159,9 @@ final class DataStreamReader private[sql] (sparkSession: 
SparkSession) extends L
    * schema in advance, use the version that specifies the schema to avoid the 
extra scan.
    *
    * You can set the following option(s): <ul> <li>`maxFilesPerTrigger` 
(default: no max limit):
-   * sets the maximum number of new files to be considered in every 
trigger.</li> </ul>
+   * sets the maximum number of new files to be considered in every 
trigger.</li>
+   * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total 
size of new files to
+   * be considered in every trigger.</li> </ul>
    *
    * You can find the JSON-specific options for reading JSON file stream in <a
    * 
href="https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option";>
@@ -179,7 +181,9 @@ final class DataStreamReader private[sql] (sparkSession: 
SparkSession) extends L
    * specify the schema explicitly using `schema`.
    *
    * You can set the following option(s): <ul> <li>`maxFilesPerTrigger` 
(default: no max limit):
-   * sets the maximum number of new files to be considered in every 
trigger.</li> </ul>
+   * sets the maximum number of new files to be considered in every 
trigger.</li>
+   * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total 
size of new files to
+   * be considered in every trigger.</li> </ul>
    *
    * You can find the CSV-specific options for reading CSV file stream in <a
    * 
href="https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option";>
@@ -197,7 +201,9 @@ final class DataStreamReader private[sql] (sparkSession: 
SparkSession) extends L
    * specify the schema explicitly using `schema`.
    *
    * You can set the following option(s): <ul> <li>`maxFilesPerTrigger` 
(default: no max limit):
-   * sets the maximum number of new files to be considered in every 
trigger.</li> </ul>
+   * sets the maximum number of new files to be considered in every 
trigger.</li>
+   * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total 
size of new files to
+   * be considered in every trigger.</li> </ul>
    *
    * You can find the XML-specific options for reading XML file stream in <a
    * 
href="https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option";>
@@ -211,7 +217,9 @@ final class DataStreamReader private[sql] (sparkSession: 
SparkSession) extends L
    * Loads a ORC file stream, returning the result as a `DataFrame`.
    *
    * You can set the following option(s): <ul> <li>`maxFilesPerTrigger` 
(default: no max limit):
-   * sets the maximum number of new files to be considered in every 
trigger.</li> </ul>
+   * sets the maximum number of new files to be considered in every 
trigger.</li>
+   * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total 
size of new files to
+   * be considered in every trigger.</li> </ul>
    *
    * ORC-specific option(s) for reading ORC file stream can be found in <a 
href=
    * 
"https://spark.apache.org/docs/latest/sql-data-sources-orc.html#data-source-option";>
 Data
@@ -225,7 +233,9 @@ final class DataStreamReader private[sql] (sparkSession: 
SparkSession) extends L
    * Loads a Parquet file stream, returning the result as a `DataFrame`.
    *
    * You can set the following option(s): <ul> <li>`maxFilesPerTrigger` 
(default: no max limit):
-   * sets the maximum number of new files to be considered in every 
trigger.</li> </ul>
+   * sets the maximum number of new files to be considered in every 
trigger.</li>
+   * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total 
size of new files to
+   * be considered in every trigger.</li> </ul>
    *
    * Parquet-specific option(s) for reading Parquet file stream can be found 
in <a href=
    * 
"https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#data-source-option";>
 Data
@@ -268,7 +278,9 @@ final class DataStreamReader private[sql] (sparkSession: 
SparkSession) extends L
    * }}}
    *
    * You can set the following option(s): <ul> <li>`maxFilesPerTrigger` 
(default: no max limit):
-   * sets the maximum number of new files to be considered in every 
trigger.</li> </ul>
+   * sets the maximum number of new files to be considered in every 
trigger.</li>
+   * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total 
size of new files to
+   * be considered in every trigger.</li> </ul>
    *
    * You can find the text-specific options for reading text files in <a
    * 
href="https://spark.apache.org/docs/latest/sql-data-sources-text.html#data-source-option";>
diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index 7a4249f9d6fc..2dc7272a4f15 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -561,6 +561,8 @@ Here are the details of all the sources in Spark.
         <br/>
         <code>maxFilesPerTrigger</code>: maximum number of new files to be 
considered in every trigger (default: no max)
         <br/>
+        <code>maxBytesPerTrigger</code>: maximum total size of new files to be 
considered in every trigger (default: no max). <code>maxBytesPerTrigger</code> 
and <code>maxFilesPerTrigger</code> can't both be set at the same time, only 
one of two must be chosen. Note that a stream always reads at least one file so 
it can make progress and not get stuck on a file larger than a given maximum.
+        <br/>
         <code>latestFirst</code>: whether to process the latest new files 
first, useful when there is a large backlog of files (default: false)
         <br/>
         <code>fileNameOnly</code>: whether to check new files based on only 
the filename instead of on the full path (default: false). With this set to 
`true`, the following files would be considered as the same file, because their 
filenames, "dataset.txt", are the same:
@@ -570,7 +572,7 @@ Here are the details of all the sources in Spark.
         "s3n://a/b/dataset.txt"<br/>
         "s3a://a/b/c/dataset.txt"
         <br/>
-        <code>maxFileAge</code>: Maximum age of a file that can be found in 
this directory, before it is ignored. For the first batch all files will be 
considered valid. If <code>latestFirst</code> is set to `true` and 
<code>maxFilesPerTrigger</code> is set, then this parameter will be ignored, 
because old files that are valid, and should be processed, may be ignored. The 
max age is specified with respect to the timestamp of the latest file, and not 
the timestamp of the current system.(d [...]
+        <code>maxFileAge</code>: Maximum age of a file that can be found in 
this directory, before it is ignored. For the first batch all files will be 
considered valid. If <code>latestFirst</code> is set to `true` and 
<code>maxFilesPerTrigger</code> or <code>maxBytesPerTrigger</code> is set, then 
this parameter will be ignored, because old files that are valid, and should be 
processed, may be ignored. The max age is specified with respect to the 
timestamp of the latest file, and not the [...]
         <br/>
         <code>cleanSource</code>: option to clean up completed files after 
processing.<br/>
         Available options are "archive", "delete", "off". If the option is not 
provided, the default value is "off".<br/>
@@ -3272,8 +3274,8 @@ Here are the different kinds of triggers that are 
supported.
     <td>
         Similar to queries one-time micro-batch trigger, the query will 
process all the available data and then
         stop on its own. The difference is that, it will process the data in 
(possibly) multiple micro-batches
-        based on the source options (e.g. <code>maxFilesPerTrigger</code> for 
file source), which will result
-        in better query scalability.
+        based on the source options (e.g. <code>maxFilesPerTrigger</code> or 
<code>maxBytesPerTrigger</code> for file 
+        source), which will result in better query scalability.
         <ul>
             <li>This trigger provides a strong guarantee of processing: 
regardless of how many batches were
                 left over in previous run, it ensures all available data at 
the time of execution gets
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadLimit.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadLimit.java
index 538adeb4ad79..a36412feee02 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadLimit.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadLimit.java
@@ -39,6 +39,8 @@ public interface ReadLimit {
 
   static ReadLimit maxFiles(int files) { return new ReadMaxFiles(files); }
 
+  static ReadLimit maxBytes(long bytes) { return new ReadMaxBytes(bytes); }
+
   static ReadLimit allAvailable() { return ReadAllAvailable.INSTANCE; }
 
   static ReadLimit compositeLimit(ReadLimit[] readLimits) {
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadLimit.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadMaxBytes.java
similarity index 53%
copy from 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadLimit.java
copy to 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadMaxBytes.java
index 538adeb4ad79..2e3c5627af4a 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadLimit.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadMaxBytes.java
@@ -20,28 +20,37 @@ package org.apache.spark.sql.connector.read.streaming;
 import org.apache.spark.annotation.Evolving;
 
 /**
- * Interface representing limits on how much to read from a {@link 
MicroBatchStream} when it
- * implements {@link SupportsAdmissionControl}. There are several child 
interfaces representing
- * various kinds of limits.
+ * Represents a {@link ReadLimit} where the {@link MicroBatchStream} should 
scan files which total
+ * size doesn't go beyond a given maximum total size. Always reads at least 
one file so a stream
+ * can make progress and not get stuck on a file larger than a given maximum.
  *
  * @see SupportsAdmissionControl#latestOffset(Offset, ReadLimit)
- * @see ReadAllAvailable
- * @see ReadMaxRows
- * @since 3.0.0
+ * @since 4.0.0
  */
 @Evolving
-public interface ReadLimit {
-  static ReadLimit minRows(long rows, long maxTriggerDelayMs) {
-    return new ReadMinRows(rows, maxTriggerDelayMs);
-  }
+public class ReadMaxBytes implements ReadLimit {
+  private long bytes;
 
-  static ReadLimit maxRows(long rows) { return new ReadMaxRows(rows); }
+  ReadMaxBytes(long bytes) {
+    this.bytes = bytes;
+  }
 
-  static ReadLimit maxFiles(int files) { return new ReadMaxFiles(files); }
+  /** Maximum total size of files to scan. */
+  public long maxBytes() { return this.bytes; }
 
-  static ReadLimit allAvailable() { return ReadAllAvailable.INSTANCE; }
+  @Override
+  public String toString() {
+    return "MaxBytes: " + maxBytes();
+  }
 
-  static ReadLimit compositeLimit(ReadLimit[] readLimits) {
-    return new CompositeReadLimit(readLimits);
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    ReadMaxBytes other = (ReadMaxBytes) o;
+    return other.maxBytes() == maxBytes();
   }
+
+  @Override
+  public int hashCode() { return Long.hashCode(bytes); }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
index ae0909559086..07c1ccc432cd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
@@ -50,11 +50,25 @@ class FileStreamOptions(parameters: 
CaseInsensitiveMap[String]) extends Logging
     }
   }
 
+  val maxBytesPerTrigger: Option[Long] = 
parameters.get("maxBytesPerTrigger").map { str =>
+    Try(str.toLong).toOption.filter(_ > 0).map(op =>
+      if (maxFilesPerTrigger.nonEmpty) {
+        throw new IllegalArgumentException(
+          "Options 'maxFilesPerTrigger' and 'maxBytesPerTrigger' " +
+            "can't be both set at the same time")
+      } else op
+    ).getOrElse {
+      throw new IllegalArgumentException(
+        s"Invalid value '$str' for option 'maxBytesPerTrigger', must be a 
positive integer")
+    }
+  }
+
   /**
    * Maximum age of a file that can be found in this directory, before it is 
ignored. For the
    * first batch all files will be considered valid. If `latestFirst` is set 
to `true` and
-   * `maxFilesPerTrigger` is set, then this parameter will be ignored, because 
old files that are
-   * valid, and should be processed, may be ignored. Please refer to 
SPARK-19813 for details.
+   * `maxFilesPerTrigger` or `maxBytesPerTrigger` is set, then this parameter 
will be ignored,
+   * because old files that are valid, and should be processed, may be 
ignored. Please refer to
+   * SPARK-19813 for details.
    *
    * The max age is specified with respect to the timestamp of the latest 
file, and not the
    * timestamp of the current system. That this means if the last file has 
timestamp 1000, and the
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 4dad4c0adeac..eacbd0447d16 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.streaming
 import java.util.concurrent.ThreadPoolExecutor
 import java.util.concurrent.TimeUnit._
 
+import scala.collection.mutable.ArrayBuffer
 import scala.util.control.NonFatal
 
 import org.apache.hadoop.conf.Configuration
@@ -31,7 +32,7 @@ import org.apache.spark.paths.SparkPath
 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.connector.read.streaming
-import org.apache.spark.sql.connector.read.streaming.{ReadAllAvailable, 
ReadLimit, ReadMaxFiles, SupportsAdmissionControl, SupportsTriggerAvailableNow}
+import org.apache.spark.sql.connector.read.streaming.{ReadAllAvailable, 
ReadLimit, ReadMaxBytes, ReadMaxFiles, SupportsAdmissionControl, 
SupportsTriggerAvailableNow}
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.datasources.{DataSource, 
InMemoryFileIndex, LogicalRelation}
 import org.apache.spark.sql.internal.SQLConf
@@ -87,6 +88,9 @@ class FileStreamSource(
   /** Maximum number of new files to be considered in each batch */
   private val maxFilesPerBatch = sourceOptions.maxFilesPerTrigger
 
+  /** Maximum number of new bytes to be considered in each batch */
+  private val maxBytesPerBatch = sourceOptions.maxBytesPerTrigger
+
   private val fileSortOrder = if (sourceOptions.latestFirst) {
       logWarning(
         """'latestFirst' is true. New files will be processed first, which may 
affect the watermark
@@ -96,7 +100,8 @@ class FileStreamSource(
       implicitly[Ordering[Long]]
     }
 
-  private val maxFileAgeMs: Long = if (sourceOptions.latestFirst && 
maxFilesPerBatch.isDefined) {
+  private val maxFileAgeMs: Long = if (sourceOptions.latestFirst &&
+      (maxFilesPerBatch.isDefined || maxBytesPerBatch.isDefined)) {
     Long.MaxValue
   } else {
     sourceOptions.maxFileAgeMs
@@ -113,16 +118,42 @@ class FileStreamSource(
   // Visible for testing and debugging in production.
   val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly)
 
-  private var allFilesForTriggerAvailableNow: Seq[(SparkPath, Long)] = _
+  private var allFilesForTriggerAvailableNow: Seq[NewFileEntry] = _
 
   metadataLog.restore().foreach { entry =>
     seenFiles.add(entry.sparkPath, entry.timestamp)
   }
   seenFiles.purge()
 
-  logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = 
$maxFileAgeMs")
+  logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, " +
+    s"maxBytesPerBatch = $maxBytesPerBatch, maxFileAgeMs = $maxFileAgeMs")
+
+  private var unreadFiles: Seq[NewFileEntry] = _
 
-  private var unreadFiles: Seq[(SparkPath, Long)] = _
+  /**
+   * Split files into a selected/unselected pair according to a total size 
threshold.
+   * Always puts the 1st element in a left split and keep adding it to a left 
split
+   * until reaches a specified threshold or [[Long.MaxValue]].
+   */
+  private def takeFilesUntilMax(files: Seq[NewFileEntry], maxSize: Long)
+    : (FilesSplit, FilesSplit) = {
+    var lSize = BigInt(0)
+    var rSize = BigInt(0)
+    val lFiles = ArrayBuffer[NewFileEntry]()
+    val rFiles = ArrayBuffer[NewFileEntry]()
+    for (i <- files.indices) {
+      val file = files(i)
+      val newSize = lSize + file.size
+      if (i == 0 || rFiles.isEmpty && newSize <= Long.MaxValue && newSize <= 
maxSize) {
+        lSize += file.size
+        lFiles += file
+      } else {
+        rSize += file.size
+        rFiles += file
+      }
+    }
+    (FilesSplit(lFiles.toSeq, lSize), FilesSplit(rFiles.toSeq, rSize))
+  }
 
   /**
    * Returns the maximum offset that can be retrieved from the source.
@@ -143,7 +174,7 @@ class FileStreamSource(
         fetchAllFiles()
       }
       allFiles.filter {
-        case (path, timestamp) => seenFiles.isNewFile(path, timestamp)
+        case NewFileEntry(path, _, timestamp) => seenFiles.isNewFile(path, 
timestamp)
       }
     }
 
@@ -152,7 +183,7 @@ class FileStreamSource(
       case files: ReadMaxFiles if !sourceOptions.latestFirst =>
         // we can cache and reuse remaining fetched list of files in further 
batches
         val (bFiles, usFiles) = newFiles.splitAt(files.maxFiles())
-        if (usFiles.size < files.maxFiles() * DISCARD_UNSEEN_FILES_RATIO) {
+        if (usFiles.size < files.maxFiles() * DISCARD_UNSEEN_INPUT_RATIO) {
           // Discard unselected files if the number of files are smaller than 
threshold.
           // This is to avoid the case when the next batch would have too few 
files to read
           // whereas there're new files available.
@@ -166,6 +197,25 @@ class FileStreamSource(
         // implies "sourceOptions.latestFirst = true" which we want to refresh 
the list per batch
         (newFiles.take(files.maxFiles()), null)
 
+      case files: ReadMaxBytes if !sourceOptions.latestFirst =>
+        // we can cache and reuse remaining fetched list of files in further 
batches
+        val (FilesSplit(bFiles, _), FilesSplit(usFiles, rSize)) =
+          takeFilesUntilMax(newFiles, files.maxBytes())
+        if (rSize.toDouble < (files.maxBytes() * DISCARD_UNSEEN_INPUT_RATIO)) {
+          // Discard unselected files if the total size of files is smaller 
than threshold.
+          // This is to avoid the case when the next batch would have too 
small of a size of
+          // files to read whereas there're new files available.
+          logTrace(s"Discarding ${usFiles.length} unread files as it's smaller 
than threshold.")
+          (bFiles, null)
+        } else {
+          (bFiles, usFiles)
+        }
+
+      case files: ReadMaxBytes =>
+        val (FilesSplit(bFiles, _), _) = takeFilesUntilMax(newFiles, 
files.maxBytes())
+        // implies "sourceOptions.latestFirst = true" which we want to refresh 
the list per batch
+        (bFiles, null)
+
       case _: ReadAllAvailable => (newFiles, null)
     }
 
@@ -178,9 +228,9 @@ class FileStreamSource(
       logTrace(s"No unread file is available for further batches.")
     }
 
-    batchFiles.foreach { file =>
-      seenFiles.add(file._1, file._2)
-      logDebug(s"New file: $file")
+    batchFiles.foreach { case NewFileEntry(p, _, timestamp) =>
+      seenFiles.add(p, timestamp)
+      logDebug(s"New file: $p")
     }
     val numPurged = seenFiles.purge()
 
@@ -196,7 +246,7 @@ class FileStreamSource(
     if (batchFiles.nonEmpty) {
       metadataLogCurrentOffset += 1
 
-      val fileEntries = batchFiles.map { case (p, timestamp) =>
+      val fileEntries = batchFiles.map { case NewFileEntry(p, _, timestamp) =>
         FileEntry(path = p.urlEncoded, timestamp = timestamp, batchId = 
metadataLogCurrentOffset)
       }.toArray
       if (metadataLog.add(metadataLogCurrentOffset, fileEntries)) {
@@ -215,7 +265,9 @@ class FileStreamSource(
   }
 
   override def getDefaultReadLimit: ReadLimit = {
-    
maxFilesPerBatch.map(ReadLimit.maxFiles).getOrElse(super.getDefaultReadLimit)
+    maxFilesPerBatch.map(ReadLimit.maxFiles).getOrElse(
+      
maxBytesPerBatch.map(ReadLimit.maxBytes).getOrElse(super.getDefaultReadLimit)
+    )
   }
 
   /**
@@ -290,7 +342,7 @@ class FileStreamSource(
   /**
    * Returns a list of files found, sorted by their timestamp.
    */
-  private def fetchAllFiles(): Seq[(SparkPath, Long)] = {
+  private def fetchAllFiles(): Seq[NewFileEntry] = {
     val startTime = System.nanoTime
 
     var allFiles: Seq[FileStatus] = null
@@ -322,7 +374,7 @@ class FileStreamSource(
     }
 
     val files = allFiles.sortBy(_.getModificationTime)(fileSortOrder).map { 
status =>
-      (SparkPath.fromFileStatus(status), status.getModificationTime)
+      NewFileEntry(SparkPath.fromFileStatus(status), status.getLen, 
status.getModificationTime)
     }
     val endTime = System.nanoTime
     val listingTimeMs = NANOSECONDS.toMillis(endTime - startTime)
@@ -369,7 +421,7 @@ object FileStreamSource {
   /** Timestamp for file modification time, in ms since January 1, 1970 UTC. */
   type Timestamp = Long
 
-  val DISCARD_UNSEEN_FILES_RATIO = 0.2
+  val DISCARD_UNSEEN_INPUT_RATIO = 0.2
   val MAX_CACHED_UNSEEN_FILES = 10000
 
   case class FileEntry(
@@ -379,6 +431,11 @@ object FileStreamSource {
     def sparkPath: SparkPath = SparkPath.fromUrlString(path)
   }
 
+  /** Newly fetched files metadata holder. */
+  private case class NewFileEntry(path: SparkPath, size: Long, timestamp: Long)
+
+  private case class FilesSplit(files: Seq[NewFileEntry], size: BigInt)
+
   /**
    * A custom hash map used to track the list of files seen. This map is not 
thread-safe.
    *
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 1a69678c2f54..6e24e14fb1eb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -226,6 +226,8 @@ final class DataStreamReader private[sql](sparkSession: 
SparkSession) extends Lo
    * <ul>
    * <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number 
of new files to be
    * considered in every trigger.</li>
+   * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total 
size of new files
+   * to be considered in every trigger.</li>
    * </ul>
    *
    * You can find the JSON-specific options for reading JSON file stream in
@@ -250,6 +252,8 @@ final class DataStreamReader private[sql](sparkSession: 
SparkSession) extends Lo
    * <ul>
    * <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number 
of new files to be
    * considered in every trigger.</li>
+   * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total 
size of new files
+   * to be considered in every trigger.</li>
    * </ul>
    *
    * You can find the CSV-specific options for reading CSV file stream in
@@ -271,6 +275,8 @@ final class DataStreamReader private[sql](sparkSession: 
SparkSession) extends Lo
    * <ul>
    * <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number 
of new files to be
    * considered in every trigger.</li>
+   * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total 
size of new files
+   * to be considered in every trigger.</li>
    * </ul>
    *
    * You can find the XML-specific options for reading XML file stream in
@@ -291,6 +297,8 @@ final class DataStreamReader private[sql](sparkSession: 
SparkSession) extends Lo
    * <ul>
    * <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number 
of new files to be
    * considered in every trigger.</li>
+   * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total 
size of new files
+   * to be considered in every trigger.</li>
    * </ul>
    *
    * ORC-specific option(s) for reading ORC file stream can be found in
@@ -311,6 +319,8 @@ final class DataStreamReader private[sql](sparkSession: 
SparkSession) extends Lo
    * <ul>
    * <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number 
of new files to be
    * considered in every trigger.</li>
+   * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total 
size of new files
+   * to be considered in every trigger.</li>
    * </ul>
    *
    * Parquet-specific option(s) for reading Parquet file stream can be found in
@@ -359,6 +369,8 @@ final class DataStreamReader private[sql](sparkSession: 
SparkSession) extends Lo
    * <ul>
    * <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number 
of new files to be
    * considered in every trigger.</li>
+   * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total 
size of new files
+   * to be considered in every trigger.</li>
    * </ul>
    *
    * You can find the text-specific options for reading text files in
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index b5cae70c47c6..fd3d59af7e6b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -1154,25 +1154,33 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
   }
 
   test("max files per trigger") {
+    testThresholdLogic("maxFilesPerTrigger")
+  }
+
+  test("SPARK-46641: max bytes per trigger") {
+    testThresholdLogic("maxBytesPerTrigger")
+  }
+
+  private def testThresholdLogic(option: String): Unit = {
     withTempDir { case src =>
       var lastFileModTime: Option[Long] = None
 
       /** Create a text file with a single data item */
-      def createFile(data: Int): File = {
-        val file = stringToFile(new File(src, s"$data.txt"), data.toString)
+      def createFile(data: String): File = {
+        val file = stringToFile(new File(src, s"$data.txt"), data)
         if (lastFileModTime.nonEmpty) file.setLastModified(lastFileModTime.get 
+ 1000)
         lastFileModTime = Some(file.lastModified)
         file
       }
 
-      createFile(1)
-      createFile(2)
-      createFile(3)
+      createFile("a")
+      createFile("b")
+      createFile("c")
 
       // Set up a query to read text files 2 at a time
       val df = spark
         .readStream
-        .option("maxFilesPerTrigger", 2)
+        .option(option, 2)
         .text(src.getCanonicalPath)
       val q = df
         .writeStream
@@ -1186,14 +1194,14 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
       val fileSource = getSourcesFromStreamingQuery(q).head
 
       /** Check the data read in the last batch */
-      def checkLastBatchData(data: Int*): Unit = {
+      def checkLastBatchData(data: Char*): Unit = {
         val schema = StructType(Seq(StructField("value", StringType)))
         val df = spark.createDataFrame(
           spark.sparkContext.makeRDD(memorySink.latestBatchData), schema)
         checkAnswer(df, data.map(_.toString).toDF("value"))
       }
 
-      def checkAllData(data: Seq[Int]): Unit = {
+      def checkAllData(data: Seq[Char]): Unit = {
         val schema = StructType(Seq(StructField("value", StringType)))
         val df = spark.createDataFrame(
           spark.sparkContext.makeRDD(memorySink.allData), schema)
@@ -1208,45 +1216,53 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
         lastBatchId = memorySink.latestBatchId.get
       }
 
-      checkLastBatchData(3)  // (1 and 2) should be in batch 1, (3) should be 
in batch 2 (last)
-      checkAllData(1 to 3)
+      checkLastBatchData('c') // (a and b) should be in batch 1, (c) should be 
in batch 2 (last)
+      checkAllData('a' to 'c')
       lastBatchId = memorySink.latestBatchId.get
 
       fileSource.withBatchingLocked {
-        createFile(4)
-        createFile(5)   // 4 and 5 should be in a batch
-        createFile(6)
-        createFile(7)   // 6 and 7 should be in the last batch
+        createFile("d")
+        createFile("e") // d and e should be in a batch
+        createFile("f")
+        createFile("g") // f and g should be in the last batch
       }
       q.processAllAvailable()
       checkNumBatchesSinceLastCheck(2)
-      checkLastBatchData(6, 7)
-      checkAllData(1 to 7)
+      checkLastBatchData('f', 'g')
+      checkAllData('a' to 'g')
 
       fileSource.withBatchingLocked {
-        createFile(8)
-        createFile(9)    // 8 and 9 should be in a batch
-        createFile(10)
-        createFile(11)   // 10 and 11 should be in a batch
-        createFile(12)   // 12 should be in the last batch
+        createFile("h")
+        createFile("i") // h and i should be in a batch
+        createFile("j")
+        createFile("k") // j and k should be in a batch
+        createFile("l") // l should be in the last batch
       }
       q.processAllAvailable()
       checkNumBatchesSinceLastCheck(3)
-      checkLastBatchData(12)
-      checkAllData(1 to 12)
+      checkLastBatchData('l')
+      checkAllData('a' to 'l')
 
       q.stop()
     }
   }
 
   testQuietly("max files per trigger - incorrect values") {
-    val testTable = "maxFilesPerTrigger_test"
+    testIncorrectThresholdValues("maxFilesPerTrigger")
+  }
+
+  testQuietly("SPARK-46641: max files per trigger - incorrect values") {
+    testIncorrectThresholdValues("maxBytesPerTrigger")
+  }
+
+  private def testIncorrectThresholdValues(option: String): Unit = {
+    val testTable = s"${option}_test"
     withTable(testTable) {
       withTempDir { case src =>
-        def testMaxFilePerTriggerValue(value: String): Unit = {
-          val df = spark.readStream.option("maxFilesPerTrigger", 
value).text(src.getCanonicalPath)
+        def testIncorrectValue(value: String): Unit = {
+          val df = spark.readStream.option(option, 
value).text(src.getCanonicalPath)
           val e = intercept[StreamingQueryException] {
-            // Note: `maxFilesPerTrigger` is checked in the stream thread when 
creating the source
+            // Note: incorrect value is checked in the stream thread when 
creating the source
             val q = 
df.writeStream.format("memory").queryName(testTable).start()
             try {
               q.processAllAvailable()
@@ -1255,20 +1271,53 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
             }
           }
           assert(e.getCause.isInstanceOf[IllegalArgumentException])
-          Seq("maxFilesPerTrigger", value, "positive integer").foreach { s =>
+          Seq(option, value, "positive integer").foreach { s =>
             assert(e.getMessage.contains(s))
           }
         }
 
-        testMaxFilePerTriggerValue("not-a-integer")
-        testMaxFilePerTriggerValue("-1")
-        testMaxFilePerTriggerValue("0")
-        testMaxFilePerTriggerValue("10.1")
+        testIncorrectValue("not-a-integer")
+        testIncorrectValue("-1")
+        testIncorrectValue("0")
+        testIncorrectValue("10.1")
+      }
+    }
+  }
+
+  testQuietly("SPARK-46641: max bytes per trigger & max files per trigger - 
both set") {
+    val testTable = "maxBytesPerTrigger_maxFilesPerTrigger_test"
+    withTable(testTable) {
+      withTempDir { case src =>
+        val df = spark.readStream
+          .option("maxBytesPerTrigger", "1")
+          .option("maxFilesPerTrigger", "1")
+          .text(src.getCanonicalPath)
+        val e = intercept[StreamingQueryException] {
+          // Note: a tested option is checked in the stream thread when 
creating the source
+          val q = df.writeStream.format("memory").queryName(testTable).start()
+          try {
+            q.processAllAvailable()
+          } finally {
+            q.stop()
+          }
+        }
+        assert(e.getCause.isInstanceOf[IllegalArgumentException])
+        Seq("maxBytesPerTrigger", "maxFilesPerTrigger", "can't be both 
set").foreach { s =>
+          assert(e.getMessage.contains(s))
+        }
       }
     }
   }
 
   test("SPARK-30669: maxFilesPerTrigger - ignored when using Trigger.Once") {
+    testIgnoreThresholdWithTriggerOnce("maxFilesPerTrigger")
+  }
+
+  test("SPARK-46641: maxBytesPerTrigger - ignored when using Trigger.Once") {
+    testIgnoreThresholdWithTriggerOnce("maxBytesPerTrigger")
+  }
+
+  private def testIgnoreThresholdWithTriggerOnce(optionName: String): Unit = {
     withTempDirs { (src, target) =>
       val checkpoint = new File(target, "chk").getCanonicalPath
       val targetDir = new File(target, "data").getCanonicalPath
@@ -1289,7 +1338,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
       // Set up a query to read text files one at a time
       val df = spark
         .readStream
-        .option("maxFilesPerTrigger", 1)
+        .option(optionName, 1)
         .text(src.getCanonicalPath)
 
       def startQuery(): StreamingQuery = {
@@ -1718,8 +1767,9 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
       secondBatch: String,
       maxFileAge: Option[String] = None,
       cleanSource: CleanSourceMode.Value = CleanSourceMode.OFF,
-      archiveDir: Option[String] = None): Unit = {
-    val srcOptions = Map("latestFirst" -> latestFirst.toString, 
"maxFilesPerTrigger" -> "1") ++
+      archiveDir: Option[String] = None,
+      thresholdOption: String = "maxFilesPerTrigger"): Unit = {
+    val srcOptions = Map("latestFirst" -> latestFirst.toString, 
thresholdOption -> "1") ++
       maxFileAge.map("maxFileAge" -> _) ++
       Seq("cleanSource" -> cleanSource.toString) ++
       archiveDir.map("sourceArchiveDir" -> _)
@@ -1777,6 +1827,19 @@ class FileStreamSourceSuite extends FileStreamSourceTest 
{
     }
   }
 
+
+  test("SPARK-46641: Ignore maxFileAge when maxBytesPerTrigger and latestFirst 
is used") {
+    withTempDir { src =>
+      // Prepare two files: 1.txt, 2.txt, and make sure they have different 
modified time.
+      val f1 = stringToFile(new File(src, "1.txt"), "1")
+      val f2 = stringToFile(new File(src, "2.txt"), "2")
+      f2.setLastModified(f1.lastModified + 3600 * 1000 /* 1 hour later */)
+
+      runTwoBatchesAndVerifyResults(src, latestFirst = true, firstBatch = "2", 
secondBatch = "1",
+        maxFileAge = Some("1m"), thresholdOption = "maxBytesPerTrigger")
+    }
+  }
+
   test("SeenFilesMap") {
     val map = new SeenFilesMap(maxAgeMs = 10, fileNameOnly = false)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org


Reply via email to