This is an automated email from the ASF dual-hosted git repository. meng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 618d6bf [SPARK-27588] Binary file data source fails fast and doesn't attempt to read very large files 618d6bf is described below commit 618d6bff71073c8c93501ab7392c3cc579730f0b Author: Xiangrui Meng <m...@databricks.com> AuthorDate: Mon Apr 29 16:24:49 2019 -0700 [SPARK-27588] Binary file data source fails fast and doesn't attempt to read very large files ## What changes were proposed in this pull request? If a file is too big (>2GB), we should fail fast and do not try to read the file. ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Closes #24483 from mengxr/SPARK-27588. Authored-by: Xiangrui Meng <m...@databricks.com> Signed-off-by: Xiangrui Meng <m...@databricks.com> --- .../org/apache/spark/sql/internal/SQLConf.scala | 8 ++++++ .../datasources/binaryfile/BinaryFileFormat.scala | 8 ++++++ .../binaryfile/BinaryFileFormatSuite.scala | 31 +++++++++++++++++++++- 3 files changed, 46 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 96d3f5c..87bce1f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1744,6 +1744,14 @@ object SQLConf { "and from_utc_timestamp() functions.") .booleanConf .createWithDefault(false) + + val SOURCES_BINARY_FILE_MAX_LENGTH = buildConf("spark.sql.sources.binaryFile.maxLength") + .doc("The max length of a file that can be read by the binary file data source. " + + "Spark will fail fast and not attempt to read the file if its length exceeds this value. " + + "The theoretical max is Int.MaxValue, though VMs might implement a smaller max.") + .internal() + .intConf + .createWithDefault(Int.MaxValue) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala index db93268..2637784 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala @@ -24,11 +24,13 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, GlobFilter, Path} import org.apache.hadoop.mapreduce.Job +import org.apache.spark.SparkException import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile} +import org.apache.spark.sql.internal.SQLConf.SOURCES_BINARY_FILE_MAX_LENGTH import org.apache.spark.sql.sources.{And, DataSourceRegister, EqualTo, Filter, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Not, Or} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -99,6 +101,7 @@ class BinaryFileFormat extends FileFormat with DataSourceRegister { val binaryFileSourceOptions = new BinaryFileSourceOptions(options) val pathGlobPattern = binaryFileSourceOptions.pathGlobFilter val filterFuncs = filters.map(filter => createFilterFunction(filter)) + val maxLength = sparkSession.conf.get(SOURCES_BINARY_FILE_MAX_LENGTH) file: PartitionedFile => { val path = new Path(file.filePath) @@ -115,6 +118,11 @@ class BinaryFileFormat extends FileFormat with DataSourceRegister { case (MODIFICATION_TIME, i) => writer.write(i, DateTimeUtils.fromMillis(status.getModificationTime)) case (CONTENT, i) => + if (status.getLen > maxLength) { + throw new SparkException( + s"The length of ${status.getPath} is ${status.getLen}, " + + s"which exceeds the max length allowed: ${maxLength}.") + } val stream = fs.open(status.getPath) try { writer.write(i, ByteStreams.toByteArray(stream)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala index fb83c3c..01dc96c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala @@ -27,10 +27,12 @@ import com.google.common.io.{ByteStreams, Closeables} import org.apache.hadoop.fs.{FileStatus, FileSystem, GlobFilter, Path} import org.mockito.Mockito.{mock, when} -import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.SparkException +import org.apache.spark.sql.{DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.execution.datasources.PartitionedFile import org.apache.spark.sql.functions.col +import org.apache.spark.sql.internal.SQLConf.SOURCES_BINARY_FILE_MAX_LENGTH import org.apache.spark.sql.sources._ import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} import org.apache.spark.sql.types._ @@ -339,4 +341,31 @@ class BinaryFileFormatSuite extends QueryTest with SharedSQLContext with SQLTest assert(df.select("LENGTH").first().getLong(0) === content.length, "column pruning should be case insensitive") } + + test("fail fast and do not attempt to read if a file is too big") { + assert(spark.conf.get(SOURCES_BINARY_FILE_MAX_LENGTH) === Int.MaxValue) + withTempPath { file => + val path = file.getPath + val content = "123".getBytes + Files.write(file.toPath, content, StandardOpenOption.CREATE, StandardOpenOption.WRITE) + def readContent(): DataFrame = { + spark.read.format(BINARY_FILE) + .load(path) + .select(CONTENT) + } + val expected = Seq(Row(content)) + QueryTest.checkAnswer(readContent(), expected) + withSQLConf(SOURCES_BINARY_FILE_MAX_LENGTH.key -> content.length.toString) { + QueryTest.checkAnswer(readContent(), expected) + } + // Disable read. If the implementation attempts to read, the exception would be different. + file.setReadable(false) + val caught = intercept[SparkException] { + withSQLConf(SOURCES_BINARY_FILE_MAX_LENGTH.key -> (content.length - 1).toString) { + QueryTest.checkAnswer(readContent(), expected) + } + } + assert(caught.getMessage.contains("exceeds the max length allowed")) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org