sandip-db commented on code in PR #44163: URL: https://github.com/apache/spark/pull/44163#discussion_r1422984743
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala: ########## @@ -750,7 +752,20 @@ object StaxXmlParser { throw QueryExecutionErrors.endOfStreamError() } val curRecord = convert(nextRecord.get) - nextRecord = xmlTokenizer.next() + try { + nextRecord = xmlTokenizer.next() + } catch { + case _: FileNotFoundException if options.ignoreMissingFiles => Review Comment: Move this try..catch into XmlTokenizer.next ########## sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala: ########## @@ -2178,4 +2207,108 @@ class XmlSuite extends QueryTest with SharedSparkSession { ) testWriteReadRoundTrip(df, Map("nullValue" -> "null", "prefersDecimal" -> "true")) } + + test("SPARK-46248: Enabling/disabling ignoreCorruptFiles/ignoreMissingFiles") { + withCorruptFile(inputFile => { + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") { + val e = intercept[SparkException] { + spark.read + .option("rowTag", "ROW") + .option("multiLine", false) + .xml(inputFile.toURI.toString) + .collect() + } + assert(ExceptionUtils.getRootCause(e).isInstanceOf[EOFException]) + assert(ExceptionUtils.getRootCause(e).getMessage === "Unexpected end of input stream") + val e2 = intercept[SparkException] { + spark.read + .option("rowTag", "ROW") + .option("multiLine", true) + .xml(inputFile.toURI.toString) + .collect() + } + assert(ExceptionUtils.getRootCause(e2).isInstanceOf[EOFException]) + assert(ExceptionUtils.getRootCause(e2).getMessage === "Unexpected end of input stream") + } + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") { + spark.read + .option("rowTag", "ROW") + .option("multiLine", false) + .xml(inputFile.toURI.toString) + .collect() + assert( + spark.read + .option("rowTag", "ROW") + .option("multiLine", true) + .xml(inputFile.toURI.toString) + .collect() + .isEmpty + ) + } + }) + withTempPath { dir => + import org.apache.hadoop.fs.Path + val xmlPath = new Path(dir.getCanonicalPath, "xml") + val fs = xmlPath.getFileSystem(spark.sessionState.newHadoopConf()) + + sampledTestData.write.option("rowTag", "ROW").xml(xmlPath.toString) + val df = spark.read.option("rowTag", "ROW").option("multiLine", true).xml(xmlPath.toString) + fs.delete(xmlPath, true) + withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "false") { + val e = intercept[SparkException] { + df.collect() + } + assert(e.getCause.isInstanceOf[SparkFileNotFoundException]) + assert(e.getCause.getMessage.contains(".xml does not exist")) + } + + sampledTestData.write.option("rowTag", "ROW").xml(xmlPath.toString) + val df2 = spark.read.option("rowTag", "ROW").option("multiLine", true).xml(xmlPath.toString) + fs.delete(xmlPath, true) + withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "true") { + assert(df2.collect().isEmpty) + } + } + } + + test("SPARK-46248: Read from a corrupted compressed file") { + withTempDir { dir => + val format = "xml" + val numRecords = 10000 + // create data + val data = + spark.sparkContext.parallelize( + (0 until numRecords).map(i => Row(i.toString, (i * 2).toString))) + val schema = buildSchema(field("a1"), field("a2")) + val df = spark.createDataFrame(data, schema) + + df.coalesce(4) + .write + .mode(SaveMode.Overwrite) + .format(format) + .option("compression", "gZiP") + .option("rowTag", "row") + .save(dir.getCanonicalPath) + + withCorruptedFile(dir) { corruptedDir => + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") { + val dfCorrupted = spark.read + .format(format) + .option("multiline", "true") + .option("compression", "gzip") + .option("rowTag", "row") + .load(corruptedDir.getCanonicalPath) + assert(!dfCorrupted.isEmpty) Review Comment: ```suggestion assert(dfCorrupted.collect().length > 100) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org