sandip-db commented on code in PR #44163:
URL: https://github.com/apache/spark/pull/44163#discussion_r1414443217


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala:
##########
@@ -120,9 +136,27 @@ class XmlInferSchema(options: XmlOptions, caseSensitive: 
Boolean)
       val rootAttributes = StaxXmlParserUtils.gatherRootAttributes(parser)
       Some(inferObject(parser, rootAttributes))
     } catch {
-      case NonFatal(_) if options.parseMode == PermissiveMode =>
-        Some(StructType(Seq(StructField(options.columnNameOfCorruptRecord, 
StringType))))
-      case NonFatal(_) =>
+      case e @ (_: XMLStreamException | _: MalformedInputException | _: 
SAXException) =>
+        handleXmlErrorsByParseMode(options.parseMode, 
options.columnNameOfCorruptRecord, e)
+      case e: CharConversionException if options.charset.isEmpty =>
+        val msg =
+          """XML parser cannot handle a character in its input.
+            |Specifying encoding as an input option explicitly might help to 
resolve the issue.
+            |""".stripMargin + e.getMessage
+        val wrappedCharException = new CharConversionException(msg)
+        wrappedCharException.initCause(e)
+        handleXmlErrorsByParseMode(
+          options.parseMode,
+          options.columnNameOfCorruptRecord,
+          wrappedCharException)
+      case e: FileNotFoundException if options.ignoreMissingFiles =>
+        logWarning("Skipped missing file", e)
+        Some(StructType(Nil))
+      case e: FileNotFoundException if !options.ignoreMissingFiles => throw e
+      case e @ (_: IOException | _: RuntimeException) if 
options.ignoreCorruptFiles =>
+        logWarning("Skipped the rest of the content in the corrupted file", e)
+        Some(StructType(Nil))
+      case NonFatal(e) =>
         None

Review Comment:
   ```suggestion
           handleXmlErrorsByParseMode(options.parseMode, 
options.columnNameOfCorruptRecord, e)
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala:
##########
@@ -2178,4 +2186,58 @@ class XmlSuite extends QueryTest with SharedSparkSession 
{
     )
     testWriteReadRoundTrip(df, Map("nullValue" -> "null", "prefersDecimal" -> 
"true"))
   }
+
+  test("Enabling/disabling ignoreCorruptFiles/ignoreMissingFiles") {
+    withCorruptFile(inputFile => {
+      withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+        val e = intercept[SparkException] {
+          spark.read.option("rowTag", 
"ROW").xml(inputFile.toURI.toString).collect()
+        }
+        assert(ExceptionUtils.getRootCause(e).isInstanceOf[EOFException])
+        assert(ExceptionUtils.getRootCause(e).getMessage === "Unexpected end 
of input stream")
+        val e2 = intercept[SparkException] {
+          spark.read
+            .option("rowTag", "ROW")
+            .option("multiLine", true)
+            .xml(inputFile.toURI.toString)
+            .collect()
+        }
+        assert(ExceptionUtils.getRootCause(e2).isInstanceOf[EOFException])
+        assert(ExceptionUtils.getRootCause(e2).getMessage === "Unexpected end 
of input stream")
+      }
+      withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
+        assert(spark.read.option("rowTag", 
"ROW").xml(inputFile.toURI.toString).collect().isEmpty)

Review Comment:
   `multiLine` is set to `true` by default for XML.
   ```suggestion
             spark.read.option("rowTag", "ROW").option("multiLine", 
false).xml(inputFile.toURI.toString).collect()
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlDataSource.scala:
##########
@@ -172,13 +176,27 @@ object MultiLineXmlDataSource extends XmlDataSource {
       parsedOptions: XmlOptions): StructType = {
     val xml = createBaseRdd(sparkSession, inputPaths, parsedOptions)
 
-    val tokenRDD = xml.flatMap { portableDataStream =>
-      StaxXmlParser.tokenizeStream(
-        CodecStreams.createInputStreamWithCloseResource(
-          portableDataStream.getConfiguration,
-          new Path(portableDataStream.getPath())),
-        parsedOptions)
-    }
+    val tokenRDD: RDD[String] =
+      xml.flatMap { portableDataStream =>
+        try {
+          StaxXmlParser.tokenizeStream(
+            CodecStreams.createInputStreamWithCloseResource(
+              portableDataStream.getConfiguration,
+              new Path(portableDataStream.getPath())),
+            parsedOptions)
+        } catch {

Review Comment:
   I don't see `ignoreCorruptFiles` handling here



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala:
##########
@@ -2178,4 +2186,58 @@ class XmlSuite extends QueryTest with SharedSparkSession 
{
     )
     testWriteReadRoundTrip(df, Map("nullValue" -> "null", "prefersDecimal" -> 
"true"))
   }
+
+  test("Enabling/disabling ignoreCorruptFiles/ignoreMissingFiles") {
+    withCorruptFile(inputFile => {
+      withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+        val e = intercept[SparkException] {
+          spark.read.option("rowTag", 
"ROW").xml(inputFile.toURI.toString).collect()

Review Comment:
   `multiLine` is set to `true` by default for XML.
   ```suggestion
             spark.read.option("rowTag", "ROW").option("multiLine", 
false).xml(inputFile.toURI.toString).collect()
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala:
##########
@@ -2178,4 +2186,58 @@ class XmlSuite extends QueryTest with SharedSparkSession 
{
     )
     testWriteReadRoundTrip(df, Map("nullValue" -> "null", "prefersDecimal" -> 
"true"))
   }
+
+  test("Enabling/disabling ignoreCorruptFiles/ignoreMissingFiles") {
+    withCorruptFile(inputFile => {
+      withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+        val e = intercept[SparkException] {
+          spark.read.option("rowTag", 
"ROW").xml(inputFile.toURI.toString).collect()
+        }
+        assert(ExceptionUtils.getRootCause(e).isInstanceOf[EOFException])
+        assert(ExceptionUtils.getRootCause(e).getMessage === "Unexpected end 
of input stream")
+        val e2 = intercept[SparkException] {
+          spark.read
+            .option("rowTag", "ROW")
+            .option("multiLine", true)
+            .xml(inputFile.toURI.toString)
+            .collect()
+        }
+        assert(ExceptionUtils.getRootCause(e2).isInstanceOf[EOFException])
+        assert(ExceptionUtils.getRootCause(e2).getMessage === "Unexpected end 
of input stream")
+      }
+      withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
+        assert(spark.read.option("rowTag", 
"ROW").xml(inputFile.toURI.toString).collect().isEmpty)
+        assert(

Review Comment:
   Not sure how this is working without `ignoreCorruptFiles` handling in 
`XMLDataSource`.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala:
##########
@@ -120,9 +136,27 @@ class XmlInferSchema(options: XmlOptions, caseSensitive: 
Boolean)
       val rootAttributes = StaxXmlParserUtils.gatherRootAttributes(parser)
       Some(inferObject(parser, rootAttributes))
     } catch {
-      case NonFatal(_) if options.parseMode == PermissiveMode =>
-        Some(StructType(Seq(StructField(options.columnNameOfCorruptRecord, 
StringType))))
-      case NonFatal(_) =>
+      case e @ (_: XMLStreamException | _: MalformedInputException | _: 
SAXException) =>
+        handleXmlErrorsByParseMode(options.parseMode, 
options.columnNameOfCorruptRecord, e)

Review Comment:
   These might be the only Exceptions. But just for defensive purpose, let's 
catch all NonFatal errors at the end as indicated.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala:
##########
@@ -2178,4 +2186,58 @@ class XmlSuite extends QueryTest with SharedSparkSession 
{
     )
     testWriteReadRoundTrip(df, Map("nullValue" -> "null", "prefersDecimal" -> 
"true"))
   }
+
+  test("Enabling/disabling ignoreCorruptFiles/ignoreMissingFiles") {
+    withCorruptFile(inputFile => {

Review Comment:
   please add scenarios with user specified schema.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala:
##########
@@ -120,9 +136,27 @@ class XmlInferSchema(options: XmlOptions, caseSensitive: 
Boolean)
       val rootAttributes = StaxXmlParserUtils.gatherRootAttributes(parser)
       Some(inferObject(parser, rootAttributes))
     } catch {
-      case NonFatal(_) if options.parseMode == PermissiveMode =>
-        Some(StructType(Seq(StructField(options.columnNameOfCorruptRecord, 
StringType))))
-      case NonFatal(_) =>
+      case e @ (_: XMLStreamException | _: MalformedInputException | _: 
SAXException) =>
+        handleXmlErrorsByParseMode(options.parseMode, 
options.columnNameOfCorruptRecord, e)
+      case e: CharConversionException if options.charset.isEmpty =>
+        val msg =
+          """XML parser cannot handle a character in its input.
+            |Specifying encoding as an input option explicitly might help to 
resolve the issue.
+            |""".stripMargin + e.getMessage
+        val wrappedCharException = new CharConversionException(msg)
+        wrappedCharException.initCause(e)
+        handleXmlErrorsByParseMode(
+          options.parseMode,
+          options.columnNameOfCorruptRecord,
+          wrappedCharException)
+      case e: FileNotFoundException if options.ignoreMissingFiles =>
+        logWarning("Skipped missing file", e)
+        Some(StructType(Nil))
+      case e: FileNotFoundException if !options.ignoreMissingFiles => throw e
+      case e @ (_: IOException | _: RuntimeException) if 
options.ignoreCorruptFiles =>
+        logWarning("Skipped the rest of the content in the corrupted file", e)
+        Some(StructType(Nil))

Review Comment:
   These exceptions are not possible here as input is a `String` as opposed to 
a stream.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to