This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 11e7ea4f11d [SPARK-45035][SQL] Fix 
ignoreCorruptFiles/ignoreMissingFiles with multiline CSV/JSON will report error
11e7ea4f11d is described below

commit 11e7ea4f11df71e2942322b01fcaab57dac20c83
Author: Jia Fan <fanjiaemi...@qq.com>
AuthorDate: Wed Oct 18 11:06:43 2023 +0500

    [SPARK-45035][SQL] Fix ignoreCorruptFiles/ignoreMissingFiles with multiline 
CSV/JSON will report error
    
    ### What changes were proposed in this pull request?
    Fix ignoreCorruptFiles/ignoreMissingFiles with multiline CSV/JSON will 
report error, it would be like:
    ```log
    org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
in stage 4940.0 failed 4 times, most recent failure: Lost task 0.3 in stage 
4940.0 (TID 4031) (10.68.177.106 executor 0): 
com.univocity.parsers.common.TextParsingException: 
java.lang.IllegalStateException - Error reading from input
    Parser Configuration: CsvParserSettings:
            Auto configuration enabled=true
            Auto-closing enabled=true
            Autodetect column delimiter=false
            Autodetect quotes=false
            Column reordering enabled=true
            Delimiters for detection=null
            Empty value=
            Escape unquoted values=false
            Header extraction enabled=null
            Headers=null
            Ignore leading whitespaces=false
            Ignore leading whitespaces in quotes=false
            Ignore trailing whitespaces=false
            Ignore trailing whitespaces in quotes=false
            Input buffer size=1048576
            Input reading on separate thread=false
            Keep escape sequences=false
            Keep quotes=false
            Length of content displayed on error=1000
            Line separator detection enabled=true
            Maximum number of characters per column=-1
            Maximum number of columns=20480
            Normalize escaped line separators=true
            Null value=
            Number of records to read=all
            Processor=none
            Restricting data in exceptions=false
            RowProcessor error handler=null
            Selected fields=none
            Skip bits as whitespace=true
            Skip empty lines=true
            Unescaped quote handling=STOP_AT_DELIMITERFormat configuration:
            CsvFormat:
                    Comment character=#
                    Field delimiter=,
                    Line separator (normalized)=\n
                    Line separator sequence=\n
                    Quote character="
                    Quote escape character=\
                    Quote escape escape character=null
    Internal state when error was thrown: line=0, column=0, record=0
            at 
com.univocity.parsers.common.AbstractParser.handleException(AbstractParser.java:402)
            at 
com.univocity.parsers.common.AbstractParser.beginParsing(AbstractParser.java:277)
            at 
com.univocity.parsers.common.AbstractParser.beginParsing(AbstractParser.java:843)
            at 
org.apache.spark.sql.catalyst.csv.UnivocityParser$$anon$1.<init>(UnivocityParser.scala:463)
            at 
org.apache.spark.sql.catalyst.csv.UnivocityParser$.convertStream(UnivocityParser.scala:46...
    ```
    Because multiline CSV/JSON use `BinaryFileRDD` not `FileScanRDD`. Unlike 
`FileScanRDD`, when met corrupt files will check `ignoreCorruptFiles` config to 
avoid report IOException, `BinaryFileRDD` will not report error because it 
return normal `PortableDataStream`. So we should catch it when infer schema in 
lambda function. Also do same thing for `ignoreMissingFiles`.
    
    ### Why are the changes needed?
    Fix the bug when use mulitline mode with 
ignoreCorruptFiles/ignoreMissingFiles config.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    add new test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #42979 from Hisoka-X/SPARK-45035_csv_multi_line.
    
    Authored-by: Jia Fan <fanjiaemi...@qq.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 .../spark/sql/catalyst/json/JsonInferSchema.scala  | 18 +++++--
 .../execution/datasources/csv/CSVDataSource.scala  | 28 ++++++++---
 .../datasources/CommonFileDataSourceSuite.scala    | 28 +++++++++++
 .../sql/execution/datasources/csv/CSVSuite.scala   | 58 +++++++++++++---------
 .../sql/execution/datasources/json/JsonSuite.scala | 46 ++++++++++++++++-
 5 files changed, 142 insertions(+), 36 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
index 4123c5290b6..4d04b34876c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.json
 
-import java.io.CharConversionException
+import java.io.{CharConversionException, FileNotFoundException, IOException}
 import java.nio.charset.MalformedInputException
 import java.util.Comparator
 
@@ -25,6 +25,7 @@ import scala.util.control.Exception.allCatch
 
 import com.fasterxml.jackson.core._
 
+import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.analysis.TypeCoercion
 import org.apache.spark.sql.catalyst.expressions.ExprUtils
@@ -36,7 +37,7 @@ import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, 
SQLConf}
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
-private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable {
+private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable 
with Logging {
 
   private val decimalParser = ExprUtils.getDecimalParser(options.locale)
 
@@ -53,6 +54,9 @@ private[sql] class JsonInferSchema(options: JSONOptions) 
extends Serializable {
     isParsing = true,
     forTimestampNTZ = true)
 
+  private val ignoreCorruptFiles = options.ignoreCorruptFiles
+  private val ignoreMissingFiles = options.ignoreMissingFiles
+
   private def handleJsonErrorsByParseMode(parseMode: ParseMode,
       columnNameOfCorruptRecord: String, e: Throwable): Option[StructType] = {
     parseMode match {
@@ -88,8 +92,7 @@ private[sql] class JsonInferSchema(options: JSONOptions) 
extends Serializable {
             Some(inferField(parser))
           }
         } catch {
-          case e @ (_: RuntimeException | _: JsonProcessingException |
-                    _: MalformedInputException) =>
+          case e @ (_: JsonProcessingException | _: MalformedInputException) =>
             handleJsonErrorsByParseMode(parseMode, columnNameOfCorruptRecord, 
e)
           case e: CharConversionException if options.encoding.isEmpty =>
             val msg =
@@ -99,6 +102,13 @@ private[sql] class JsonInferSchema(options: JSONOptions) 
extends Serializable {
             val wrappedCharException = new CharConversionException(msg)
             wrappedCharException.initCause(e)
             handleJsonErrorsByParseMode(parseMode, columnNameOfCorruptRecord, 
wrappedCharException)
+          case e: FileNotFoundException if ignoreMissingFiles =>
+            logWarning("Skipped missing file", e)
+            Some(StructType(Nil))
+          case e: FileNotFoundException if !ignoreMissingFiles => throw e
+          case e @ (_: IOException | _: RuntimeException) if 
ignoreCorruptFiles =>
+            logWarning("Skipped the rest of the content in the corrupted 
file", e)
+            Some(StructType(Nil))
         }
       }.reduceOption(typeMerger).iterator
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
index 99d43953c4c..cf7c536bdae 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources.csv
 
+import java.io.{FileNotFoundException, IOException}
 import java.nio.charset.{Charset, StandardCharsets}
 
 import com.univocity.parsers.csv.CsvParser
@@ -168,7 +169,7 @@ object TextInputCSVDataSource extends CSVDataSource {
   }
 }
 
-object MultiLineCSVDataSource extends CSVDataSource {
+object MultiLineCSVDataSource extends CSVDataSource with Logging {
   override val isSplitable: Boolean = false
 
   override def readFile(
@@ -189,13 +190,26 @@ object MultiLineCSVDataSource extends CSVDataSource {
       inputPaths: Seq[FileStatus],
       parsedOptions: CSVOptions): StructType = {
     val csv = createBaseRdd(sparkSession, inputPaths, parsedOptions)
+    val ignoreCorruptFiles = parsedOptions.ignoreCorruptFiles
+    val ignoreMissingFiles = parsedOptions.ignoreMissingFiles
     csv.flatMap { lines =>
-      val path = new Path(lines.getPath())
-      UnivocityParser.tokenizeStream(
-        
CodecStreams.createInputStreamWithCloseResource(lines.getConfiguration, path),
-        shouldDropHeader = false,
-        new CsvParser(parsedOptions.asParserSettings),
-        encoding = parsedOptions.charset)
+      try {
+        val path = new Path(lines.getPath())
+        UnivocityParser.tokenizeStream(
+          
CodecStreams.createInputStreamWithCloseResource(lines.getConfiguration, path),
+          shouldDropHeader = false,
+          new CsvParser(parsedOptions.asParserSettings),
+          encoding = parsedOptions.charset)
+      } catch {
+        case e: FileNotFoundException if ignoreMissingFiles =>
+          logWarning(s"Skipped missing file: ${lines.getPath()}", e)
+          Array.empty[Array[String]]
+        case e: FileNotFoundException if !ignoreMissingFiles => throw e
+        case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles 
=>
+          logWarning(
+            s"Skipped the rest of the content in the corrupted file: 
${lines.getPath()}", e)
+          Array.empty[Array[String]]
+      }
     }.take(1).headOption match {
       case Some(firstRow) =>
         val caseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CommonFileDataSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CommonFileDataSourceSuite.scala
index 739f4c440be..2e3a4bbafb8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CommonFileDataSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CommonFileDataSourceSuite.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import java.io.{ByteArrayOutputStream, File, FileOutputStream}
+import java.util.zip.GZIPOutputStream
+
 import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite
 
 import org.apache.spark.sql.{Dataset, Encoders, 
FakeFileSystemRequiringDSOption, SparkSession}
@@ -60,4 +63,29 @@ trait CommonFileDataSourceSuite extends SQLHelper {
       }
     }
   }
+
+  protected def withCorruptFile(f: File => Unit): Unit = {
+    val inputFile = File.createTempFile("input-", ".gz")
+    try {
+      // Create a corrupt gzip file
+      val byteOutput = new ByteArrayOutputStream()
+      val gzip = new GZIPOutputStream(byteOutput)
+      try {
+        gzip.write(Array[Byte](1, 2, 3, 4))
+      } finally {
+        gzip.close()
+      }
+      val bytes = byteOutput.toByteArray
+      val o = new FileOutputStream(inputFile)
+      try {
+        // It's corrupt since we only write half of bytes into the file.
+        o.write(bytes.take(bytes.length / 2))
+      } finally {
+        o.close()
+      }
+      f(inputFile)
+    } finally {
+      inputFile.delete()
+    }
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 7655635fc62..38fbf466882 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -17,14 +17,13 @@
 
 package org.apache.spark.sql.execution.datasources.csv
 
-import java.io.{ByteArrayOutputStream, EOFException, File, FileOutputStream}
+import java.io.{EOFException, File}
 import java.nio.charset.{Charset, StandardCharsets, 
UnsupportedCharsetException}
 import java.nio.file.{Files, StandardOpenOption}
 import java.sql.{Date, Timestamp}
 import java.text.SimpleDateFormat
 import java.time.{Duration, Instant, LocalDate, LocalDateTime, Period}
 import java.util.Locale
-import java.util.zip.GZIPOutputStream
 
 import scala.jdk.CollectionConverters._
 import scala.util.Properties
@@ -32,11 +31,12 @@ import scala.util.Properties
 import com.univocity.parsers.common.TextParsingException
 import org.apache.commons.lang3.exception.ExceptionUtils
 import org.apache.commons.lang3.time.FastDateFormat
+import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.SequenceFile.CompressionType
 import org.apache.hadoop.io.compress.GzipCodec
 import org.apache.logging.log4j.Level
 
-import org.apache.spark.{SparkConf, SparkException, SparkRuntimeException, 
SparkUpgradeException, TestUtils}
+import org.apache.spark.{SparkConf, SparkException, 
SparkFileNotFoundException, SparkRuntimeException, SparkUpgradeException, 
TestUtils}
 import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Encoders, 
QueryTest, Row}
 import org.apache.spark.sql.catalyst.csv.CSVOptions
 import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils}
@@ -1447,37 +1447,47 @@ abstract class CSVSuite
     }
   }
 
-  test("Enabling/disabling ignoreCorruptFiles") {
-    val inputFile = File.createTempFile("input-", ".gz")
-    try {
-      // Create a corrupt gzip file
-      val byteOutput = new ByteArrayOutputStream()
-      val gzip = new GZIPOutputStream(byteOutput)
-      try {
-        gzip.write(Array[Byte](1, 2, 3, 4))
-      } finally {
-        gzip.close()
-      }
-      val bytes = byteOutput.toByteArray
-      val o = new FileOutputStream(inputFile)
-      try {
-        // It's corrupt since we only write half of bytes into the file.
-        o.write(bytes.take(bytes.length / 2))
-      } finally {
-        o.close()
-      }
+  test("Enabling/disabling ignoreCorruptFiles/ignoreMissingFiles") {
+    withCorruptFile(inputFile => {
       withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
         val e = intercept[SparkException] {
           spark.read.csv(inputFile.toURI.toString).collect()
         }
         assert(e.getCause.getCause.isInstanceOf[EOFException])
         assert(e.getCause.getCause.getMessage === "Unexpected end of input 
stream")
+        val e2 = intercept[SparkException] {
+          spark.read.option("multiLine", 
true).csv(inputFile.toURI.toString).collect()
+        }
+        assert(e2.getCause.getCause.getCause.isInstanceOf[EOFException])
+        assert(e2.getCause.getCause.getCause.getMessage === "Unexpected end of 
input stream")
       }
       withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
         assert(spark.read.csv(inputFile.toURI.toString).collect().isEmpty)
+        assert(spark.read.option("multiLine", 
true).csv(inputFile.toURI.toString).collect()
+          .isEmpty)
+      }
+    })
+    withTempPath { dir =>
+      val csvPath = new Path(dir.getCanonicalPath, "csv")
+      val fs = csvPath.getFileSystem(spark.sessionState.newHadoopConf())
+
+      sampledTestData.write.csv(csvPath.toString)
+      val df = spark.read.option("multiLine", true).csv(csvPath.toString)
+      fs.delete(csvPath, true)
+      withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "false") {
+        val e = intercept[SparkException] {
+          df.collect()
+        }
+        assert(e.getCause.isInstanceOf[SparkFileNotFoundException])
+        assert(e.getCause.getMessage.contains(".csv does not exist"))
+      }
+
+      sampledTestData.write.csv(csvPath.toString)
+      val df2 = spark.read.option("multiLine", true).csv(csvPath.toString)
+      fs.delete(csvPath, true)
+      withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "true") {
+        assert(df2.collect().isEmpty)
       }
-    } finally {
-      inputFile.delete()
     }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 11779286ec2..5096b241f56 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -30,7 +30,7 @@ import org.apache.hadoop.fs.{Path, PathFilter}
 import org.apache.hadoop.io.SequenceFile.CompressionType
 import org.apache.hadoop.io.compress.GzipCodec
 
-import org.apache.spark.{SparkConf, SparkException, SparkRuntimeException, 
SparkUpgradeException, TestUtils}
+import org.apache.spark.{SparkConf, SparkException, 
SparkFileNotFoundException, SparkRuntimeException, SparkUpgradeException, 
TestUtils}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{functions => F, _}
 import org.apache.spark.sql.catalyst.json._
@@ -1913,6 +1913,50 @@ abstract class JsonSuite
     }
   }
 
+  test("SPARK-45035: json enable ignoreCorruptFiles/ignoreMissingFiles") {
+    withCorruptFile(inputFile => {
+      withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+        val e = intercept[SparkException] {
+          spark.read.json(inputFile.toURI.toString).collect()
+        }
+        assert(e.getCause.getCause.isInstanceOf[EOFException])
+        assert(e.getCause.getCause.getMessage === "Unexpected end of input 
stream")
+        val e2 = intercept[SparkException] {
+          spark.read.option("multiLine", 
true).json(inputFile.toURI.toString).collect()
+        }
+        assert(e2.getCause.isInstanceOf[EOFException])
+        assert(e2.getCause.getMessage === "Unexpected end of input stream")
+      }
+      withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
+        assert(spark.read.json(inputFile.toURI.toString).collect().isEmpty)
+        assert(spark.read.option("multiLine", 
true).json(inputFile.toURI.toString).collect()
+          .isEmpty)
+      }
+    })
+    withTempPath { dir =>
+      val jsonPath = new Path(dir.getCanonicalPath, "json")
+      val fs = jsonPath.getFileSystem(spark.sessionState.newHadoopConf())
+
+      sampledTestData.write.json(jsonPath.toString)
+      val df = spark.read.option("multiLine", true).json(jsonPath.toString)
+      fs.delete(jsonPath, true)
+      withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "false") {
+        val e = intercept[SparkException] {
+          df.collect()
+        }
+        assert(e.getCause.isInstanceOf[SparkFileNotFoundException])
+        assert(e.getCause.getMessage.contains(".json does not exist"))
+      }
+
+      sampledTestData.write.json(jsonPath.toString)
+      val df2 = spark.read.option("multiLine", true).json(jsonPath.toString)
+      fs.delete(jsonPath, true)
+      withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "true") {
+        assert(df2.collect().isEmpty)
+      }
+    }
+  }
+
   test("SPARK-18352: Handle multi-line corrupt documents (PERMISSIVE)") {
     withTempPath { dir =>
       val path = dir.getCanonicalPath


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to