This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 3f5faaa24e3a [SPARK-46641][SS] Add maxBytesPerTrigger threshold 3f5faaa24e3a is described below commit 3f5faaa24e3ab4d9cc8f996bd1938573dd057e20 Author: maxim_konstantinov <maxim_konstanti...@apple.com> AuthorDate: Thu Feb 8 23:16:17 2024 -0800 [SPARK-46641][SS] Add maxBytesPerTrigger threshold ### What changes were proposed in this pull request? This PR adds [Input Streaming Source's](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#input-sources) option `maxBytesPerTrigger` for limiting the total size of input files in a streaming batch. Semantics of `maxBytesPerTrigger` is very close to already existing one `maxFilesPerTrigger` option. #### How a feature was implemented? Because `maxBytesPerTrigger` is semantically close to `maxFilesPerTrigger` I used all the `maxFilesPerTrigger` usages in the whole repository as a potential places that requires changes, that includes: - Option paramater definition - Option related logic - Option related ScalaDoc and MD files - Option related test I went over the usage of all usages of `maxFilesPerTrigger` in `FileStreamSourceSuite` and implemented `maxBytesPerTrigger` in the same fashion as those two are pretty close in their nature. From the structure and elements of ReadLimit I've concluded that current design implies only one simple rule for ReadLimit, so I openly prohibited the setting of both maxFilesPerTrigger and maxBytesPerTrigger at the same time. ### Why are the changes needed? This feature is useful for our and our sister teams and we expect it will find a broad acceptance among Spark users. We have a use-case in a few of the Spark pipelines we support when we use Available-now trigger for periodic processing using Spark Streaming. We use `maxFilesPerTrigger` threshold for now, but this is not ideal as Input file size might change with the time which requires periodic configuration adjustment of `maxFilesPerTrigger`. Computational complexity of the job depe [...] ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? New unit tests were added or existing `maxFilesPerTrigger` test were extended. I searched `maxFilesPerTrigger` related test and added new tests or extended existing ones trying to minimize and simplify the changes. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44636 from MaxNevermind/streaming-add-maxBytesPerTrigger-option. Lead-authored-by: maxim_konstantinov <maxim_konstanti...@apple.com> Co-authored-by: Max Konstantinov <konstantinov.ma...@gmail.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../spark/sql/streaming/DataStreamReader.scala | 24 +++- docs/structured-streaming-programming-guide.md | 8 +- .../sql/connector/read/streaming/ReadLimit.java | 2 + .../{ReadLimit.java => ReadMaxBytes.java} | 39 +++--- .../execution/streaming/FileStreamOptions.scala | 18 ++- .../sql/execution/streaming/FileStreamSource.scala | 87 +++++++++++--- .../spark/sql/streaming/DataStreamReader.scala | 12 ++ .../sql/streaming/FileStreamSourceSuite.scala | 133 +++++++++++++++------ 8 files changed, 247 insertions(+), 76 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index bc8e30cd300c..789425c9daea 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -159,7 +159,9 @@ final class DataStreamReader private[sql] (sparkSession: SparkSession) extends L * schema in advance, use the version that specifies the schema to avoid the extra scan. * * You can set the following option(s): <ul> <li>`maxFilesPerTrigger` (default: no max limit): - * sets the maximum number of new files to be considered in every trigger.</li> </ul> + * sets the maximum number of new files to be considered in every trigger.</li> + * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total size of new files to + * be considered in every trigger.</li> </ul> * * You can find the JSON-specific options for reading JSON file stream in <a * href="https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option"> @@ -179,7 +181,9 @@ final class DataStreamReader private[sql] (sparkSession: SparkSession) extends L * specify the schema explicitly using `schema`. * * You can set the following option(s): <ul> <li>`maxFilesPerTrigger` (default: no max limit): - * sets the maximum number of new files to be considered in every trigger.</li> </ul> + * sets the maximum number of new files to be considered in every trigger.</li> + * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total size of new files to + * be considered in every trigger.</li> </ul> * * You can find the CSV-specific options for reading CSV file stream in <a * href="https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option"> @@ -197,7 +201,9 @@ final class DataStreamReader private[sql] (sparkSession: SparkSession) extends L * specify the schema explicitly using `schema`. * * You can set the following option(s): <ul> <li>`maxFilesPerTrigger` (default: no max limit): - * sets the maximum number of new files to be considered in every trigger.</li> </ul> + * sets the maximum number of new files to be considered in every trigger.</li> + * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total size of new files to + * be considered in every trigger.</li> </ul> * * You can find the XML-specific options for reading XML file stream in <a * href="https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> @@ -211,7 +217,9 @@ final class DataStreamReader private[sql] (sparkSession: SparkSession) extends L * Loads a ORC file stream, returning the result as a `DataFrame`. * * You can set the following option(s): <ul> <li>`maxFilesPerTrigger` (default: no max limit): - * sets the maximum number of new files to be considered in every trigger.</li> </ul> + * sets the maximum number of new files to be considered in every trigger.</li> + * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total size of new files to + * be considered in every trigger.</li> </ul> * * ORC-specific option(s) for reading ORC file stream can be found in <a href= * "https://spark.apache.org/docs/latest/sql-data-sources-orc.html#data-source-option"> Data @@ -225,7 +233,9 @@ final class DataStreamReader private[sql] (sparkSession: SparkSession) extends L * Loads a Parquet file stream, returning the result as a `DataFrame`. * * You can set the following option(s): <ul> <li>`maxFilesPerTrigger` (default: no max limit): - * sets the maximum number of new files to be considered in every trigger.</li> </ul> + * sets the maximum number of new files to be considered in every trigger.</li> + * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total size of new files to + * be considered in every trigger.</li> </ul> * * Parquet-specific option(s) for reading Parquet file stream can be found in <a href= * "https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#data-source-option"> Data @@ -268,7 +278,9 @@ final class DataStreamReader private[sql] (sparkSession: SparkSession) extends L * }}} * * You can set the following option(s): <ul> <li>`maxFilesPerTrigger` (default: no max limit): - * sets the maximum number of new files to be considered in every trigger.</li> </ul> + * sets the maximum number of new files to be considered in every trigger.</li> + * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total size of new files to + * be considered in every trigger.</li> </ul> * * You can find the text-specific options for reading text files in <a * href="https://spark.apache.org/docs/latest/sql-data-sources-text.html#data-source-option"> diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 7a4249f9d6fc..2dc7272a4f15 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -561,6 +561,8 @@ Here are the details of all the sources in Spark. <br/> <code>maxFilesPerTrigger</code>: maximum number of new files to be considered in every trigger (default: no max) <br/> + <code>maxBytesPerTrigger</code>: maximum total size of new files to be considered in every trigger (default: no max). <code>maxBytesPerTrigger</code> and <code>maxFilesPerTrigger</code> can't both be set at the same time, only one of two must be chosen. Note that a stream always reads at least one file so it can make progress and not get stuck on a file larger than a given maximum. + <br/> <code>latestFirst</code>: whether to process the latest new files first, useful when there is a large backlog of files (default: false) <br/> <code>fileNameOnly</code>: whether to check new files based on only the filename instead of on the full path (default: false). With this set to `true`, the following files would be considered as the same file, because their filenames, "dataset.txt", are the same: @@ -570,7 +572,7 @@ Here are the details of all the sources in Spark. "s3n://a/b/dataset.txt"<br/> "s3a://a/b/c/dataset.txt" <br/> - <code>maxFileAge</code>: Maximum age of a file that can be found in this directory, before it is ignored. For the first batch all files will be considered valid. If <code>latestFirst</code> is set to `true` and <code>maxFilesPerTrigger</code> is set, then this parameter will be ignored, because old files that are valid, and should be processed, may be ignored. The max age is specified with respect to the timestamp of the latest file, and not the timestamp of the current system.(d [...] + <code>maxFileAge</code>: Maximum age of a file that can be found in this directory, before it is ignored. For the first batch all files will be considered valid. If <code>latestFirst</code> is set to `true` and <code>maxFilesPerTrigger</code> or <code>maxBytesPerTrigger</code> is set, then this parameter will be ignored, because old files that are valid, and should be processed, may be ignored. The max age is specified with respect to the timestamp of the latest file, and not the [...] <br/> <code>cleanSource</code>: option to clean up completed files after processing.<br/> Available options are "archive", "delete", "off". If the option is not provided, the default value is "off".<br/> @@ -3272,8 +3274,8 @@ Here are the different kinds of triggers that are supported. <td> Similar to queries one-time micro-batch trigger, the query will process all the available data and then stop on its own. The difference is that, it will process the data in (possibly) multiple micro-batches - based on the source options (e.g. <code>maxFilesPerTrigger</code> for file source), which will result - in better query scalability. + based on the source options (e.g. <code>maxFilesPerTrigger</code> or <code>maxBytesPerTrigger</code> for file + source), which will result in better query scalability. <ul> <li>This trigger provides a strong guarantee of processing: regardless of how many batches were left over in previous run, it ensures all available data at the time of execution gets diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadLimit.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadLimit.java index 538adeb4ad79..a36412feee02 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadLimit.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadLimit.java @@ -39,6 +39,8 @@ public interface ReadLimit { static ReadLimit maxFiles(int files) { return new ReadMaxFiles(files); } + static ReadLimit maxBytes(long bytes) { return new ReadMaxBytes(bytes); } + static ReadLimit allAvailable() { return ReadAllAvailable.INSTANCE; } static ReadLimit compositeLimit(ReadLimit[] readLimits) { diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadLimit.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadMaxBytes.java similarity index 53% copy from sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadLimit.java copy to sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadMaxBytes.java index 538adeb4ad79..2e3c5627af4a 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadLimit.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadMaxBytes.java @@ -20,28 +20,37 @@ package org.apache.spark.sql.connector.read.streaming; import org.apache.spark.annotation.Evolving; /** - * Interface representing limits on how much to read from a {@link MicroBatchStream} when it - * implements {@link SupportsAdmissionControl}. There are several child interfaces representing - * various kinds of limits. + * Represents a {@link ReadLimit} where the {@link MicroBatchStream} should scan files which total + * size doesn't go beyond a given maximum total size. Always reads at least one file so a stream + * can make progress and not get stuck on a file larger than a given maximum. * * @see SupportsAdmissionControl#latestOffset(Offset, ReadLimit) - * @see ReadAllAvailable - * @see ReadMaxRows - * @since 3.0.0 + * @since 4.0.0 */ @Evolving -public interface ReadLimit { - static ReadLimit minRows(long rows, long maxTriggerDelayMs) { - return new ReadMinRows(rows, maxTriggerDelayMs); - } +public class ReadMaxBytes implements ReadLimit { + private long bytes; - static ReadLimit maxRows(long rows) { return new ReadMaxRows(rows); } + ReadMaxBytes(long bytes) { + this.bytes = bytes; + } - static ReadLimit maxFiles(int files) { return new ReadMaxFiles(files); } + /** Maximum total size of files to scan. */ + public long maxBytes() { return this.bytes; } - static ReadLimit allAvailable() { return ReadAllAvailable.INSTANCE; } + @Override + public String toString() { + return "MaxBytes: " + maxBytes(); + } - static ReadLimit compositeLimit(ReadLimit[] readLimits) { - return new CompositeReadLimit(readLimits); + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ReadMaxBytes other = (ReadMaxBytes) o; + return other.maxBytes() == maxBytes(); } + + @Override + public int hashCode() { return Long.hashCode(bytes); } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala index ae0909559086..07c1ccc432cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala @@ -50,11 +50,25 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging } } + val maxBytesPerTrigger: Option[Long] = parameters.get("maxBytesPerTrigger").map { str => + Try(str.toLong).toOption.filter(_ > 0).map(op => + if (maxFilesPerTrigger.nonEmpty) { + throw new IllegalArgumentException( + "Options 'maxFilesPerTrigger' and 'maxBytesPerTrigger' " + + "can't be both set at the same time") + } else op + ).getOrElse { + throw new IllegalArgumentException( + s"Invalid value '$str' for option 'maxBytesPerTrigger', must be a positive integer") + } + } + /** * Maximum age of a file that can be found in this directory, before it is ignored. For the * first batch all files will be considered valid. If `latestFirst` is set to `true` and - * `maxFilesPerTrigger` is set, then this parameter will be ignored, because old files that are - * valid, and should be processed, may be ignored. Please refer to SPARK-19813 for details. + * `maxFilesPerTrigger` or `maxBytesPerTrigger` is set, then this parameter will be ignored, + * because old files that are valid, and should be processed, may be ignored. Please refer to + * SPARK-19813 for details. * * The max age is specified with respect to the timestamp of the latest file, and not the * timestamp of the current system. That this means if the last file has timestamp 1000, and the diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 4dad4c0adeac..eacbd0447d16 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.streaming import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit._ +import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration @@ -31,7 +32,7 @@ import org.apache.spark.paths.SparkPath import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.connector.read.streaming -import org.apache.spark.sql.connector.read.streaming.{ReadAllAvailable, ReadLimit, ReadMaxFiles, SupportsAdmissionControl, SupportsTriggerAvailableNow} +import org.apache.spark.sql.connector.read.streaming.{ReadAllAvailable, ReadLimit, ReadMaxBytes, ReadMaxFiles, SupportsAdmissionControl, SupportsTriggerAvailableNow} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources.{DataSource, InMemoryFileIndex, LogicalRelation} import org.apache.spark.sql.internal.SQLConf @@ -87,6 +88,9 @@ class FileStreamSource( /** Maximum number of new files to be considered in each batch */ private val maxFilesPerBatch = sourceOptions.maxFilesPerTrigger + /** Maximum number of new bytes to be considered in each batch */ + private val maxBytesPerBatch = sourceOptions.maxBytesPerTrigger + private val fileSortOrder = if (sourceOptions.latestFirst) { logWarning( """'latestFirst' is true. New files will be processed first, which may affect the watermark @@ -96,7 +100,8 @@ class FileStreamSource( implicitly[Ordering[Long]] } - private val maxFileAgeMs: Long = if (sourceOptions.latestFirst && maxFilesPerBatch.isDefined) { + private val maxFileAgeMs: Long = if (sourceOptions.latestFirst && + (maxFilesPerBatch.isDefined || maxBytesPerBatch.isDefined)) { Long.MaxValue } else { sourceOptions.maxFileAgeMs @@ -113,16 +118,42 @@ class FileStreamSource( // Visible for testing and debugging in production. val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly) - private var allFilesForTriggerAvailableNow: Seq[(SparkPath, Long)] = _ + private var allFilesForTriggerAvailableNow: Seq[NewFileEntry] = _ metadataLog.restore().foreach { entry => seenFiles.add(entry.sparkPath, entry.timestamp) } seenFiles.purge() - logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = $maxFileAgeMs") + logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, " + + s"maxBytesPerBatch = $maxBytesPerBatch, maxFileAgeMs = $maxFileAgeMs") + + private var unreadFiles: Seq[NewFileEntry] = _ - private var unreadFiles: Seq[(SparkPath, Long)] = _ + /** + * Split files into a selected/unselected pair according to a total size threshold. + * Always puts the 1st element in a left split and keep adding it to a left split + * until reaches a specified threshold or [[Long.MaxValue]]. + */ + private def takeFilesUntilMax(files: Seq[NewFileEntry], maxSize: Long) + : (FilesSplit, FilesSplit) = { + var lSize = BigInt(0) + var rSize = BigInt(0) + val lFiles = ArrayBuffer[NewFileEntry]() + val rFiles = ArrayBuffer[NewFileEntry]() + for (i <- files.indices) { + val file = files(i) + val newSize = lSize + file.size + if (i == 0 || rFiles.isEmpty && newSize <= Long.MaxValue && newSize <= maxSize) { + lSize += file.size + lFiles += file + } else { + rSize += file.size + rFiles += file + } + } + (FilesSplit(lFiles.toSeq, lSize), FilesSplit(rFiles.toSeq, rSize)) + } /** * Returns the maximum offset that can be retrieved from the source. @@ -143,7 +174,7 @@ class FileStreamSource( fetchAllFiles() } allFiles.filter { - case (path, timestamp) => seenFiles.isNewFile(path, timestamp) + case NewFileEntry(path, _, timestamp) => seenFiles.isNewFile(path, timestamp) } } @@ -152,7 +183,7 @@ class FileStreamSource( case files: ReadMaxFiles if !sourceOptions.latestFirst => // we can cache and reuse remaining fetched list of files in further batches val (bFiles, usFiles) = newFiles.splitAt(files.maxFiles()) - if (usFiles.size < files.maxFiles() * DISCARD_UNSEEN_FILES_RATIO) { + if (usFiles.size < files.maxFiles() * DISCARD_UNSEEN_INPUT_RATIO) { // Discard unselected files if the number of files are smaller than threshold. // This is to avoid the case when the next batch would have too few files to read // whereas there're new files available. @@ -166,6 +197,25 @@ class FileStreamSource( // implies "sourceOptions.latestFirst = true" which we want to refresh the list per batch (newFiles.take(files.maxFiles()), null) + case files: ReadMaxBytes if !sourceOptions.latestFirst => + // we can cache and reuse remaining fetched list of files in further batches + val (FilesSplit(bFiles, _), FilesSplit(usFiles, rSize)) = + takeFilesUntilMax(newFiles, files.maxBytes()) + if (rSize.toDouble < (files.maxBytes() * DISCARD_UNSEEN_INPUT_RATIO)) { + // Discard unselected files if the total size of files is smaller than threshold. + // This is to avoid the case when the next batch would have too small of a size of + // files to read whereas there're new files available. + logTrace(s"Discarding ${usFiles.length} unread files as it's smaller than threshold.") + (bFiles, null) + } else { + (bFiles, usFiles) + } + + case files: ReadMaxBytes => + val (FilesSplit(bFiles, _), _) = takeFilesUntilMax(newFiles, files.maxBytes()) + // implies "sourceOptions.latestFirst = true" which we want to refresh the list per batch + (bFiles, null) + case _: ReadAllAvailable => (newFiles, null) } @@ -178,9 +228,9 @@ class FileStreamSource( logTrace(s"No unread file is available for further batches.") } - batchFiles.foreach { file => - seenFiles.add(file._1, file._2) - logDebug(s"New file: $file") + batchFiles.foreach { case NewFileEntry(p, _, timestamp) => + seenFiles.add(p, timestamp) + logDebug(s"New file: $p") } val numPurged = seenFiles.purge() @@ -196,7 +246,7 @@ class FileStreamSource( if (batchFiles.nonEmpty) { metadataLogCurrentOffset += 1 - val fileEntries = batchFiles.map { case (p, timestamp) => + val fileEntries = batchFiles.map { case NewFileEntry(p, _, timestamp) => FileEntry(path = p.urlEncoded, timestamp = timestamp, batchId = metadataLogCurrentOffset) }.toArray if (metadataLog.add(metadataLogCurrentOffset, fileEntries)) { @@ -215,7 +265,9 @@ class FileStreamSource( } override def getDefaultReadLimit: ReadLimit = { - maxFilesPerBatch.map(ReadLimit.maxFiles).getOrElse(super.getDefaultReadLimit) + maxFilesPerBatch.map(ReadLimit.maxFiles).getOrElse( + maxBytesPerBatch.map(ReadLimit.maxBytes).getOrElse(super.getDefaultReadLimit) + ) } /** @@ -290,7 +342,7 @@ class FileStreamSource( /** * Returns a list of files found, sorted by their timestamp. */ - private def fetchAllFiles(): Seq[(SparkPath, Long)] = { + private def fetchAllFiles(): Seq[NewFileEntry] = { val startTime = System.nanoTime var allFiles: Seq[FileStatus] = null @@ -322,7 +374,7 @@ class FileStreamSource( } val files = allFiles.sortBy(_.getModificationTime)(fileSortOrder).map { status => - (SparkPath.fromFileStatus(status), status.getModificationTime) + NewFileEntry(SparkPath.fromFileStatus(status), status.getLen, status.getModificationTime) } val endTime = System.nanoTime val listingTimeMs = NANOSECONDS.toMillis(endTime - startTime) @@ -369,7 +421,7 @@ object FileStreamSource { /** Timestamp for file modification time, in ms since January 1, 1970 UTC. */ type Timestamp = Long - val DISCARD_UNSEEN_FILES_RATIO = 0.2 + val DISCARD_UNSEEN_INPUT_RATIO = 0.2 val MAX_CACHED_UNSEEN_FILES = 10000 case class FileEntry( @@ -379,6 +431,11 @@ object FileStreamSource { def sparkPath: SparkPath = SparkPath.fromUrlString(path) } + /** Newly fetched files metadata holder. */ + private case class NewFileEntry(path: SparkPath, size: Long, timestamp: Long) + + private case class FilesSplit(files: Seq[NewFileEntry], size: BigInt) + /** * A custom hash map used to track the list of files seen. This map is not thread-safe. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 1a69678c2f54..6e24e14fb1eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -226,6 +226,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * <ul> * <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be * considered in every trigger.</li> + * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total size of new files + * to be considered in every trigger.</li> * </ul> * * You can find the JSON-specific options for reading JSON file stream in @@ -250,6 +252,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * <ul> * <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be * considered in every trigger.</li> + * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total size of new files + * to be considered in every trigger.</li> * </ul> * * You can find the CSV-specific options for reading CSV file stream in @@ -271,6 +275,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * <ul> * <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be * considered in every trigger.</li> + * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total size of new files + * to be considered in every trigger.</li> * </ul> * * You can find the XML-specific options for reading XML file stream in @@ -291,6 +297,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * <ul> * <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be * considered in every trigger.</li> + * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total size of new files + * to be considered in every trigger.</li> * </ul> * * ORC-specific option(s) for reading ORC file stream can be found in @@ -311,6 +319,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * <ul> * <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be * considered in every trigger.</li> + * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total size of new files + * to be considered in every trigger.</li> * </ul> * * Parquet-specific option(s) for reading Parquet file stream can be found in @@ -359,6 +369,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * <ul> * <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be * considered in every trigger.</li> + * <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total size of new files + * to be considered in every trigger.</li> * </ul> * * You can find the text-specific options for reading text files in diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index b5cae70c47c6..fd3d59af7e6b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1154,25 +1154,33 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } test("max files per trigger") { + testThresholdLogic("maxFilesPerTrigger") + } + + test("SPARK-46641: max bytes per trigger") { + testThresholdLogic("maxBytesPerTrigger") + } + + private def testThresholdLogic(option: String): Unit = { withTempDir { case src => var lastFileModTime: Option[Long] = None /** Create a text file with a single data item */ - def createFile(data: Int): File = { - val file = stringToFile(new File(src, s"$data.txt"), data.toString) + def createFile(data: String): File = { + val file = stringToFile(new File(src, s"$data.txt"), data) if (lastFileModTime.nonEmpty) file.setLastModified(lastFileModTime.get + 1000) lastFileModTime = Some(file.lastModified) file } - createFile(1) - createFile(2) - createFile(3) + createFile("a") + createFile("b") + createFile("c") // Set up a query to read text files 2 at a time val df = spark .readStream - .option("maxFilesPerTrigger", 2) + .option(option, 2) .text(src.getCanonicalPath) val q = df .writeStream @@ -1186,14 +1194,14 @@ class FileStreamSourceSuite extends FileStreamSourceTest { val fileSource = getSourcesFromStreamingQuery(q).head /** Check the data read in the last batch */ - def checkLastBatchData(data: Int*): Unit = { + def checkLastBatchData(data: Char*): Unit = { val schema = StructType(Seq(StructField("value", StringType))) val df = spark.createDataFrame( spark.sparkContext.makeRDD(memorySink.latestBatchData), schema) checkAnswer(df, data.map(_.toString).toDF("value")) } - def checkAllData(data: Seq[Int]): Unit = { + def checkAllData(data: Seq[Char]): Unit = { val schema = StructType(Seq(StructField("value", StringType))) val df = spark.createDataFrame( spark.sparkContext.makeRDD(memorySink.allData), schema) @@ -1208,45 +1216,53 @@ class FileStreamSourceSuite extends FileStreamSourceTest { lastBatchId = memorySink.latestBatchId.get } - checkLastBatchData(3) // (1 and 2) should be in batch 1, (3) should be in batch 2 (last) - checkAllData(1 to 3) + checkLastBatchData('c') // (a and b) should be in batch 1, (c) should be in batch 2 (last) + checkAllData('a' to 'c') lastBatchId = memorySink.latestBatchId.get fileSource.withBatchingLocked { - createFile(4) - createFile(5) // 4 and 5 should be in a batch - createFile(6) - createFile(7) // 6 and 7 should be in the last batch + createFile("d") + createFile("e") // d and e should be in a batch + createFile("f") + createFile("g") // f and g should be in the last batch } q.processAllAvailable() checkNumBatchesSinceLastCheck(2) - checkLastBatchData(6, 7) - checkAllData(1 to 7) + checkLastBatchData('f', 'g') + checkAllData('a' to 'g') fileSource.withBatchingLocked { - createFile(8) - createFile(9) // 8 and 9 should be in a batch - createFile(10) - createFile(11) // 10 and 11 should be in a batch - createFile(12) // 12 should be in the last batch + createFile("h") + createFile("i") // h and i should be in a batch + createFile("j") + createFile("k") // j and k should be in a batch + createFile("l") // l should be in the last batch } q.processAllAvailable() checkNumBatchesSinceLastCheck(3) - checkLastBatchData(12) - checkAllData(1 to 12) + checkLastBatchData('l') + checkAllData('a' to 'l') q.stop() } } testQuietly("max files per trigger - incorrect values") { - val testTable = "maxFilesPerTrigger_test" + testIncorrectThresholdValues("maxFilesPerTrigger") + } + + testQuietly("SPARK-46641: max files per trigger - incorrect values") { + testIncorrectThresholdValues("maxBytesPerTrigger") + } + + private def testIncorrectThresholdValues(option: String): Unit = { + val testTable = s"${option}_test" withTable(testTable) { withTempDir { case src => - def testMaxFilePerTriggerValue(value: String): Unit = { - val df = spark.readStream.option("maxFilesPerTrigger", value).text(src.getCanonicalPath) + def testIncorrectValue(value: String): Unit = { + val df = spark.readStream.option(option, value).text(src.getCanonicalPath) val e = intercept[StreamingQueryException] { - // Note: `maxFilesPerTrigger` is checked in the stream thread when creating the source + // Note: incorrect value is checked in the stream thread when creating the source val q = df.writeStream.format("memory").queryName(testTable).start() try { q.processAllAvailable() @@ -1255,20 +1271,53 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } assert(e.getCause.isInstanceOf[IllegalArgumentException]) - Seq("maxFilesPerTrigger", value, "positive integer").foreach { s => + Seq(option, value, "positive integer").foreach { s => assert(e.getMessage.contains(s)) } } - testMaxFilePerTriggerValue("not-a-integer") - testMaxFilePerTriggerValue("-1") - testMaxFilePerTriggerValue("0") - testMaxFilePerTriggerValue("10.1") + testIncorrectValue("not-a-integer") + testIncorrectValue("-1") + testIncorrectValue("0") + testIncorrectValue("10.1") + } + } + } + + testQuietly("SPARK-46641: max bytes per trigger & max files per trigger - both set") { + val testTable = "maxBytesPerTrigger_maxFilesPerTrigger_test" + withTable(testTable) { + withTempDir { case src => + val df = spark.readStream + .option("maxBytesPerTrigger", "1") + .option("maxFilesPerTrigger", "1") + .text(src.getCanonicalPath) + val e = intercept[StreamingQueryException] { + // Note: a tested option is checked in the stream thread when creating the source + val q = df.writeStream.format("memory").queryName(testTable).start() + try { + q.processAllAvailable() + } finally { + q.stop() + } + } + assert(e.getCause.isInstanceOf[IllegalArgumentException]) + Seq("maxBytesPerTrigger", "maxFilesPerTrigger", "can't be both set").foreach { s => + assert(e.getMessage.contains(s)) + } } } } test("SPARK-30669: maxFilesPerTrigger - ignored when using Trigger.Once") { + testIgnoreThresholdWithTriggerOnce("maxFilesPerTrigger") + } + + test("SPARK-46641: maxBytesPerTrigger - ignored when using Trigger.Once") { + testIgnoreThresholdWithTriggerOnce("maxBytesPerTrigger") + } + + private def testIgnoreThresholdWithTriggerOnce(optionName: String): Unit = { withTempDirs { (src, target) => val checkpoint = new File(target, "chk").getCanonicalPath val targetDir = new File(target, "data").getCanonicalPath @@ -1289,7 +1338,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { // Set up a query to read text files one at a time val df = spark .readStream - .option("maxFilesPerTrigger", 1) + .option(optionName, 1) .text(src.getCanonicalPath) def startQuery(): StreamingQuery = { @@ -1718,8 +1767,9 @@ class FileStreamSourceSuite extends FileStreamSourceTest { secondBatch: String, maxFileAge: Option[String] = None, cleanSource: CleanSourceMode.Value = CleanSourceMode.OFF, - archiveDir: Option[String] = None): Unit = { - val srcOptions = Map("latestFirst" -> latestFirst.toString, "maxFilesPerTrigger" -> "1") ++ + archiveDir: Option[String] = None, + thresholdOption: String = "maxFilesPerTrigger"): Unit = { + val srcOptions = Map("latestFirst" -> latestFirst.toString, thresholdOption -> "1") ++ maxFileAge.map("maxFileAge" -> _) ++ Seq("cleanSource" -> cleanSource.toString) ++ archiveDir.map("sourceArchiveDir" -> _) @@ -1777,6 +1827,19 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + + test("SPARK-46641: Ignore maxFileAge when maxBytesPerTrigger and latestFirst is used") { + withTempDir { src => + // Prepare two files: 1.txt, 2.txt, and make sure they have different modified time. + val f1 = stringToFile(new File(src, "1.txt"), "1") + val f2 = stringToFile(new File(src, "2.txt"), "2") + f2.setLastModified(f1.lastModified + 3600 * 1000 /* 1 hour later */) + + runTwoBatchesAndVerifyResults(src, latestFirst = true, firstBatch = "2", secondBatch = "1", + maxFileAge = Some("1m"), thresholdOption = "maxBytesPerTrigger") + } + } + test("SeenFilesMap") { val map = new SeenFilesMap(maxAgeMs = 10, fileNameOnly = false) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org