This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 11e7ea4f11d [SPARK-45035][SQL] Fix ignoreCorruptFiles/ignoreMissingFiles with multiline CSV/JSON will report error 11e7ea4f11d is described below commit 11e7ea4f11df71e2942322b01fcaab57dac20c83 Author: Jia Fan <fanjiaemi...@qq.com> AuthorDate: Wed Oct 18 11:06:43 2023 +0500 [SPARK-45035][SQL] Fix ignoreCorruptFiles/ignoreMissingFiles with multiline CSV/JSON will report error ### What changes were proposed in this pull request? Fix ignoreCorruptFiles/ignoreMissingFiles with multiline CSV/JSON will report error, it would be like: ```log org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4940.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4940.0 (TID 4031) (10.68.177.106 executor 0): com.univocity.parsers.common.TextParsingException: java.lang.IllegalStateException - Error reading from input Parser Configuration: CsvParserSettings: Auto configuration enabled=true Auto-closing enabled=true Autodetect column delimiter=false Autodetect quotes=false Column reordering enabled=true Delimiters for detection=null Empty value= Escape unquoted values=false Header extraction enabled=null Headers=null Ignore leading whitespaces=false Ignore leading whitespaces in quotes=false Ignore trailing whitespaces=false Ignore trailing whitespaces in quotes=false Input buffer size=1048576 Input reading on separate thread=false Keep escape sequences=false Keep quotes=false Length of content displayed on error=1000 Line separator detection enabled=true Maximum number of characters per column=-1 Maximum number of columns=20480 Normalize escaped line separators=true Null value= Number of records to read=all Processor=none Restricting data in exceptions=false RowProcessor error handler=null Selected fields=none Skip bits as whitespace=true Skip empty lines=true Unescaped quote handling=STOP_AT_DELIMITERFormat configuration: CsvFormat: Comment character=# Field delimiter=, Line separator (normalized)=\n Line separator sequence=\n Quote character=" Quote escape character=\ Quote escape escape character=null Internal state when error was thrown: line=0, column=0, record=0 at com.univocity.parsers.common.AbstractParser.handleException(AbstractParser.java:402) at com.univocity.parsers.common.AbstractParser.beginParsing(AbstractParser.java:277) at com.univocity.parsers.common.AbstractParser.beginParsing(AbstractParser.java:843) at org.apache.spark.sql.catalyst.csv.UnivocityParser$$anon$1.<init>(UnivocityParser.scala:463) at org.apache.spark.sql.catalyst.csv.UnivocityParser$.convertStream(UnivocityParser.scala:46... ``` Because multiline CSV/JSON use `BinaryFileRDD` not `FileScanRDD`. Unlike `FileScanRDD`, when met corrupt files will check `ignoreCorruptFiles` config to avoid report IOException, `BinaryFileRDD` will not report error because it return normal `PortableDataStream`. So we should catch it when infer schema in lambda function. Also do same thing for `ignoreMissingFiles`. ### Why are the changes needed? Fix the bug when use mulitline mode with ignoreCorruptFiles/ignoreMissingFiles config. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? add new test. ### Was this patch authored or co-authored using generative AI tooling? No Closes #42979 from Hisoka-X/SPARK-45035_csv_multi_line. Authored-by: Jia Fan <fanjiaemi...@qq.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../spark/sql/catalyst/json/JsonInferSchema.scala | 18 +++++-- .../execution/datasources/csv/CSVDataSource.scala | 28 ++++++++--- .../datasources/CommonFileDataSourceSuite.scala | 28 +++++++++++ .../sql/execution/datasources/csv/CSVSuite.scala | 58 +++++++++++++--------- .../sql/execution/datasources/json/JsonSuite.scala | 46 ++++++++++++++++- 5 files changed, 142 insertions(+), 36 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala index 4123c5290b6..4d04b34876c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.json -import java.io.CharConversionException +import java.io.{CharConversionException, FileNotFoundException, IOException} import java.nio.charset.MalformedInputException import java.util.Comparator @@ -25,6 +25,7 @@ import scala.util.control.Exception.allCatch import com.fasterxml.jackson.core._ +import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.TypeCoercion import org.apache.spark.sql.catalyst.expressions.ExprUtils @@ -36,7 +37,7 @@ import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} import org.apache.spark.sql.types._ import org.apache.spark.util.Utils -private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { +private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable with Logging { private val decimalParser = ExprUtils.getDecimalParser(options.locale) @@ -53,6 +54,9 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { isParsing = true, forTimestampNTZ = true) + private val ignoreCorruptFiles = options.ignoreCorruptFiles + private val ignoreMissingFiles = options.ignoreMissingFiles + private def handleJsonErrorsByParseMode(parseMode: ParseMode, columnNameOfCorruptRecord: String, e: Throwable): Option[StructType] = { parseMode match { @@ -88,8 +92,7 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { Some(inferField(parser)) } } catch { - case e @ (_: RuntimeException | _: JsonProcessingException | - _: MalformedInputException) => + case e @ (_: JsonProcessingException | _: MalformedInputException) => handleJsonErrorsByParseMode(parseMode, columnNameOfCorruptRecord, e) case e: CharConversionException if options.encoding.isEmpty => val msg = @@ -99,6 +102,13 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { val wrappedCharException = new CharConversionException(msg) wrappedCharException.initCause(e) handleJsonErrorsByParseMode(parseMode, columnNameOfCorruptRecord, wrappedCharException) + case e: FileNotFoundException if ignoreMissingFiles => + logWarning("Skipped missing file", e) + Some(StructType(Nil)) + case e: FileNotFoundException if !ignoreMissingFiles => throw e + case e @ (_: IOException | _: RuntimeException) if ignoreCorruptFiles => + logWarning("Skipped the rest of the content in the corrupted file", e) + Some(StructType(Nil)) } }.reduceOption(typeMerger).iterator } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala index 99d43953c4c..cf7c536bdae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources.csv +import java.io.{FileNotFoundException, IOException} import java.nio.charset.{Charset, StandardCharsets} import com.univocity.parsers.csv.CsvParser @@ -168,7 +169,7 @@ object TextInputCSVDataSource extends CSVDataSource { } } -object MultiLineCSVDataSource extends CSVDataSource { +object MultiLineCSVDataSource extends CSVDataSource with Logging { override val isSplitable: Boolean = false override def readFile( @@ -189,13 +190,26 @@ object MultiLineCSVDataSource extends CSVDataSource { inputPaths: Seq[FileStatus], parsedOptions: CSVOptions): StructType = { val csv = createBaseRdd(sparkSession, inputPaths, parsedOptions) + val ignoreCorruptFiles = parsedOptions.ignoreCorruptFiles + val ignoreMissingFiles = parsedOptions.ignoreMissingFiles csv.flatMap { lines => - val path = new Path(lines.getPath()) - UnivocityParser.tokenizeStream( - CodecStreams.createInputStreamWithCloseResource(lines.getConfiguration, path), - shouldDropHeader = false, - new CsvParser(parsedOptions.asParserSettings), - encoding = parsedOptions.charset) + try { + val path = new Path(lines.getPath()) + UnivocityParser.tokenizeStream( + CodecStreams.createInputStreamWithCloseResource(lines.getConfiguration, path), + shouldDropHeader = false, + new CsvParser(parsedOptions.asParserSettings), + encoding = parsedOptions.charset) + } catch { + case e: FileNotFoundException if ignoreMissingFiles => + logWarning(s"Skipped missing file: ${lines.getPath()}", e) + Array.empty[Array[String]] + case e: FileNotFoundException if !ignoreMissingFiles => throw e + case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles => + logWarning( + s"Skipped the rest of the content in the corrupted file: ${lines.getPath()}", e) + Array.empty[Array[String]] + } }.take(1).headOption match { case Some(firstRow) => val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CommonFileDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CommonFileDataSourceSuite.scala index 739f4c440be..2e3a4bbafb8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CommonFileDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CommonFileDataSourceSuite.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.execution.datasources +import java.io.{ByteArrayOutputStream, File, FileOutputStream} +import java.util.zip.GZIPOutputStream + import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite import org.apache.spark.sql.{Dataset, Encoders, FakeFileSystemRequiringDSOption, SparkSession} @@ -60,4 +63,29 @@ trait CommonFileDataSourceSuite extends SQLHelper { } } } + + protected def withCorruptFile(f: File => Unit): Unit = { + val inputFile = File.createTempFile("input-", ".gz") + try { + // Create a corrupt gzip file + val byteOutput = new ByteArrayOutputStream() + val gzip = new GZIPOutputStream(byteOutput) + try { + gzip.write(Array[Byte](1, 2, 3, 4)) + } finally { + gzip.close() + } + val bytes = byteOutput.toByteArray + val o = new FileOutputStream(inputFile) + try { + // It's corrupt since we only write half of bytes into the file. + o.write(bytes.take(bytes.length / 2)) + } finally { + o.close() + } + f(inputFile) + } finally { + inputFile.delete() + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 7655635fc62..38fbf466882 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -17,14 +17,13 @@ package org.apache.spark.sql.execution.datasources.csv -import java.io.{ByteArrayOutputStream, EOFException, File, FileOutputStream} +import java.io.{EOFException, File} import java.nio.charset.{Charset, StandardCharsets, UnsupportedCharsetException} import java.nio.file.{Files, StandardOpenOption} import java.sql.{Date, Timestamp} import java.text.SimpleDateFormat import java.time.{Duration, Instant, LocalDate, LocalDateTime, Period} import java.util.Locale -import java.util.zip.GZIPOutputStream import scala.jdk.CollectionConverters._ import scala.util.Properties @@ -32,11 +31,12 @@ import scala.util.Properties import com.univocity.parsers.common.TextParsingException import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.commons.lang3.time.FastDateFormat +import org.apache.hadoop.fs.Path import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.GzipCodec import org.apache.logging.log4j.Level -import org.apache.spark.{SparkConf, SparkException, SparkRuntimeException, SparkUpgradeException, TestUtils} +import org.apache.spark.{SparkConf, SparkException, SparkFileNotFoundException, SparkRuntimeException, SparkUpgradeException, TestUtils} import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Encoders, QueryTest, Row} import org.apache.spark.sql.catalyst.csv.CSVOptions import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils} @@ -1447,37 +1447,47 @@ abstract class CSVSuite } } - test("Enabling/disabling ignoreCorruptFiles") { - val inputFile = File.createTempFile("input-", ".gz") - try { - // Create a corrupt gzip file - val byteOutput = new ByteArrayOutputStream() - val gzip = new GZIPOutputStream(byteOutput) - try { - gzip.write(Array[Byte](1, 2, 3, 4)) - } finally { - gzip.close() - } - val bytes = byteOutput.toByteArray - val o = new FileOutputStream(inputFile) - try { - // It's corrupt since we only write half of bytes into the file. - o.write(bytes.take(bytes.length / 2)) - } finally { - o.close() - } + test("Enabling/disabling ignoreCorruptFiles/ignoreMissingFiles") { + withCorruptFile(inputFile => { withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") { val e = intercept[SparkException] { spark.read.csv(inputFile.toURI.toString).collect() } assert(e.getCause.getCause.isInstanceOf[EOFException]) assert(e.getCause.getCause.getMessage === "Unexpected end of input stream") + val e2 = intercept[SparkException] { + spark.read.option("multiLine", true).csv(inputFile.toURI.toString).collect() + } + assert(e2.getCause.getCause.getCause.isInstanceOf[EOFException]) + assert(e2.getCause.getCause.getCause.getMessage === "Unexpected end of input stream") } withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") { assert(spark.read.csv(inputFile.toURI.toString).collect().isEmpty) + assert(spark.read.option("multiLine", true).csv(inputFile.toURI.toString).collect() + .isEmpty) + } + }) + withTempPath { dir => + val csvPath = new Path(dir.getCanonicalPath, "csv") + val fs = csvPath.getFileSystem(spark.sessionState.newHadoopConf()) + + sampledTestData.write.csv(csvPath.toString) + val df = spark.read.option("multiLine", true).csv(csvPath.toString) + fs.delete(csvPath, true) + withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "false") { + val e = intercept[SparkException] { + df.collect() + } + assert(e.getCause.isInstanceOf[SparkFileNotFoundException]) + assert(e.getCause.getMessage.contains(".csv does not exist")) + } + + sampledTestData.write.csv(csvPath.toString) + val df2 = spark.read.option("multiLine", true).csv(csvPath.toString) + fs.delete(csvPath, true) + withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "true") { + assert(df2.collect().isEmpty) } - } finally { - inputFile.delete() } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 11779286ec2..5096b241f56 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -30,7 +30,7 @@ import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.GzipCodec -import org.apache.spark.{SparkConf, SparkException, SparkRuntimeException, SparkUpgradeException, TestUtils} +import org.apache.spark.{SparkConf, SparkException, SparkFileNotFoundException, SparkRuntimeException, SparkUpgradeException, TestUtils} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{functions => F, _} import org.apache.spark.sql.catalyst.json._ @@ -1913,6 +1913,50 @@ abstract class JsonSuite } } + test("SPARK-45035: json enable ignoreCorruptFiles/ignoreMissingFiles") { + withCorruptFile(inputFile => { + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") { + val e = intercept[SparkException] { + spark.read.json(inputFile.toURI.toString).collect() + } + assert(e.getCause.getCause.isInstanceOf[EOFException]) + assert(e.getCause.getCause.getMessage === "Unexpected end of input stream") + val e2 = intercept[SparkException] { + spark.read.option("multiLine", true).json(inputFile.toURI.toString).collect() + } + assert(e2.getCause.isInstanceOf[EOFException]) + assert(e2.getCause.getMessage === "Unexpected end of input stream") + } + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") { + assert(spark.read.json(inputFile.toURI.toString).collect().isEmpty) + assert(spark.read.option("multiLine", true).json(inputFile.toURI.toString).collect() + .isEmpty) + } + }) + withTempPath { dir => + val jsonPath = new Path(dir.getCanonicalPath, "json") + val fs = jsonPath.getFileSystem(spark.sessionState.newHadoopConf()) + + sampledTestData.write.json(jsonPath.toString) + val df = spark.read.option("multiLine", true).json(jsonPath.toString) + fs.delete(jsonPath, true) + withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "false") { + val e = intercept[SparkException] { + df.collect() + } + assert(e.getCause.isInstanceOf[SparkFileNotFoundException]) + assert(e.getCause.getMessage.contains(".json does not exist")) + } + + sampledTestData.write.json(jsonPath.toString) + val df2 = spark.read.option("multiLine", true).json(jsonPath.toString) + fs.delete(jsonPath, true) + withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "true") { + assert(df2.collect().isEmpty) + } + } + } + test("SPARK-18352: Handle multi-line corrupt documents (PERMISSIVE)") { withTempPath { dir => val path = dir.getCanonicalPath --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org