sandip-db commented on code in PR #44163: URL: https://github.com/apache/spark/pull/44163#discussion_r1414443217
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala: ########## @@ -120,9 +136,27 @@ class XmlInferSchema(options: XmlOptions, caseSensitive: Boolean) val rootAttributes = StaxXmlParserUtils.gatherRootAttributes(parser) Some(inferObject(parser, rootAttributes)) } catch { - case NonFatal(_) if options.parseMode == PermissiveMode => - Some(StructType(Seq(StructField(options.columnNameOfCorruptRecord, StringType)))) - case NonFatal(_) => + case e @ (_: XMLStreamException | _: MalformedInputException | _: SAXException) => + handleXmlErrorsByParseMode(options.parseMode, options.columnNameOfCorruptRecord, e) + case e: CharConversionException if options.charset.isEmpty => + val msg = + """XML parser cannot handle a character in its input. + |Specifying encoding as an input option explicitly might help to resolve the issue. + |""".stripMargin + e.getMessage + val wrappedCharException = new CharConversionException(msg) + wrappedCharException.initCause(e) + handleXmlErrorsByParseMode( + options.parseMode, + options.columnNameOfCorruptRecord, + wrappedCharException) + case e: FileNotFoundException if options.ignoreMissingFiles => + logWarning("Skipped missing file", e) + Some(StructType(Nil)) + case e: FileNotFoundException if !options.ignoreMissingFiles => throw e + case e @ (_: IOException | _: RuntimeException) if options.ignoreCorruptFiles => + logWarning("Skipped the rest of the content in the corrupted file", e) + Some(StructType(Nil)) + case NonFatal(e) => None Review Comment: ```suggestion handleXmlErrorsByParseMode(options.parseMode, options.columnNameOfCorruptRecord, e) ``` ########## sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala: ########## @@ -2178,4 +2186,58 @@ class XmlSuite extends QueryTest with SharedSparkSession { ) testWriteReadRoundTrip(df, Map("nullValue" -> "null", "prefersDecimal" -> "true")) } + + test("Enabling/disabling ignoreCorruptFiles/ignoreMissingFiles") { + withCorruptFile(inputFile => { + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") { + val e = intercept[SparkException] { + spark.read.option("rowTag", "ROW").xml(inputFile.toURI.toString).collect() + } + assert(ExceptionUtils.getRootCause(e).isInstanceOf[EOFException]) + assert(ExceptionUtils.getRootCause(e).getMessage === "Unexpected end of input stream") + val e2 = intercept[SparkException] { + spark.read + .option("rowTag", "ROW") + .option("multiLine", true) + .xml(inputFile.toURI.toString) + .collect() + } + assert(ExceptionUtils.getRootCause(e2).isInstanceOf[EOFException]) + assert(ExceptionUtils.getRootCause(e2).getMessage === "Unexpected end of input stream") + } + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") { + assert(spark.read.option("rowTag", "ROW").xml(inputFile.toURI.toString).collect().isEmpty) Review Comment: `multiLine` is set to `true` by default for XML. ```suggestion spark.read.option("rowTag", "ROW").option("multiLine", false).xml(inputFile.toURI.toString).collect() ``` ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlDataSource.scala: ########## @@ -172,13 +176,27 @@ object MultiLineXmlDataSource extends XmlDataSource { parsedOptions: XmlOptions): StructType = { val xml = createBaseRdd(sparkSession, inputPaths, parsedOptions) - val tokenRDD = xml.flatMap { portableDataStream => - StaxXmlParser.tokenizeStream( - CodecStreams.createInputStreamWithCloseResource( - portableDataStream.getConfiguration, - new Path(portableDataStream.getPath())), - parsedOptions) - } + val tokenRDD: RDD[String] = + xml.flatMap { portableDataStream => + try { + StaxXmlParser.tokenizeStream( + CodecStreams.createInputStreamWithCloseResource( + portableDataStream.getConfiguration, + new Path(portableDataStream.getPath())), + parsedOptions) + } catch { Review Comment: I don't see `ignoreCorruptFiles` handling here ########## sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala: ########## @@ -2178,4 +2186,58 @@ class XmlSuite extends QueryTest with SharedSparkSession { ) testWriteReadRoundTrip(df, Map("nullValue" -> "null", "prefersDecimal" -> "true")) } + + test("Enabling/disabling ignoreCorruptFiles/ignoreMissingFiles") { + withCorruptFile(inputFile => { + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") { + val e = intercept[SparkException] { + spark.read.option("rowTag", "ROW").xml(inputFile.toURI.toString).collect() Review Comment: `multiLine` is set to `true` by default for XML. ```suggestion spark.read.option("rowTag", "ROW").option("multiLine", false).xml(inputFile.toURI.toString).collect() ``` ########## sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala: ########## @@ -2178,4 +2186,58 @@ class XmlSuite extends QueryTest with SharedSparkSession { ) testWriteReadRoundTrip(df, Map("nullValue" -> "null", "prefersDecimal" -> "true")) } + + test("Enabling/disabling ignoreCorruptFiles/ignoreMissingFiles") { + withCorruptFile(inputFile => { + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") { + val e = intercept[SparkException] { + spark.read.option("rowTag", "ROW").xml(inputFile.toURI.toString).collect() + } + assert(ExceptionUtils.getRootCause(e).isInstanceOf[EOFException]) + assert(ExceptionUtils.getRootCause(e).getMessage === "Unexpected end of input stream") + val e2 = intercept[SparkException] { + spark.read + .option("rowTag", "ROW") + .option("multiLine", true) + .xml(inputFile.toURI.toString) + .collect() + } + assert(ExceptionUtils.getRootCause(e2).isInstanceOf[EOFException]) + assert(ExceptionUtils.getRootCause(e2).getMessage === "Unexpected end of input stream") + } + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") { + assert(spark.read.option("rowTag", "ROW").xml(inputFile.toURI.toString).collect().isEmpty) + assert( Review Comment: Not sure how this is working without `ignoreCorruptFiles` handling in `XMLDataSource`. ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala: ########## @@ -120,9 +136,27 @@ class XmlInferSchema(options: XmlOptions, caseSensitive: Boolean) val rootAttributes = StaxXmlParserUtils.gatherRootAttributes(parser) Some(inferObject(parser, rootAttributes)) } catch { - case NonFatal(_) if options.parseMode == PermissiveMode => - Some(StructType(Seq(StructField(options.columnNameOfCorruptRecord, StringType)))) - case NonFatal(_) => + case e @ (_: XMLStreamException | _: MalformedInputException | _: SAXException) => + handleXmlErrorsByParseMode(options.parseMode, options.columnNameOfCorruptRecord, e) Review Comment: These might be the only Exceptions. But just for defensive purpose, let's catch all NonFatal errors at the end as indicated. ########## sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala: ########## @@ -2178,4 +2186,58 @@ class XmlSuite extends QueryTest with SharedSparkSession { ) testWriteReadRoundTrip(df, Map("nullValue" -> "null", "prefersDecimal" -> "true")) } + + test("Enabling/disabling ignoreCorruptFiles/ignoreMissingFiles") { + withCorruptFile(inputFile => { Review Comment: please add scenarios with user specified schema. ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala: ########## @@ -120,9 +136,27 @@ class XmlInferSchema(options: XmlOptions, caseSensitive: Boolean) val rootAttributes = StaxXmlParserUtils.gatherRootAttributes(parser) Some(inferObject(parser, rootAttributes)) } catch { - case NonFatal(_) if options.parseMode == PermissiveMode => - Some(StructType(Seq(StructField(options.columnNameOfCorruptRecord, StringType)))) - case NonFatal(_) => + case e @ (_: XMLStreamException | _: MalformedInputException | _: SAXException) => + handleXmlErrorsByParseMode(options.parseMode, options.columnNameOfCorruptRecord, e) + case e: CharConversionException if options.charset.isEmpty => + val msg = + """XML parser cannot handle a character in its input. + |Specifying encoding as an input option explicitly might help to resolve the issue. + |""".stripMargin + e.getMessage + val wrappedCharException = new CharConversionException(msg) + wrappedCharException.initCause(e) + handleXmlErrorsByParseMode( + options.parseMode, + options.columnNameOfCorruptRecord, + wrappedCharException) + case e: FileNotFoundException if options.ignoreMissingFiles => + logWarning("Skipped missing file", e) + Some(StructType(Nil)) + case e: FileNotFoundException if !options.ignoreMissingFiles => throw e + case e @ (_: IOException | _: RuntimeException) if options.ignoreCorruptFiles => + logWarning("Skipped the rest of the content in the corrupted file", e) + Some(StructType(Nil)) Review Comment: These exceptions are not possible here as input is a `String` as opposed to a stream. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org