This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new fcf340a1de3 [SPARK-45912][SQL] Enhancement of XSDToSchema API: Change to HDFS API for cloud storage accessibility fcf340a1de3 is described below commit fcf340a1de371ce1beb2cf93473ea2f2b793801b Author: Shujing Yang <shujing.y...@databricks.com> AuthorDate: Fri Nov 17 09:17:57 2023 +0900 [SPARK-45912][SQL] Enhancement of XSDToSchema API: Change to HDFS API for cloud storage accessibility ### What changes were proposed in this pull request? Previously, it utilized `java.nio.path`, which limited file reading to local file systems only. By changing this to an HDFS-compatible API, we now enable the XSDToSchema function to access files in cloud storage. ### Why are the changes needed? We want to enable the XSDToSchema function to access files in cloud storage. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #43789 from shujingyang-db/xsd_api. Authored-by: Shujing Yang <shujing.y...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../spark/sql/catalyst/xml/ValidatorUtil.scala | 36 ++++++++++++------- .../execution/datasources/xml/XSDToSchema.scala | 35 +++++++----------- .../datasources/xml/util/XSDToSchemaSuite.scala | 41 +++++++++++----------- 3 files changed, 55 insertions(+), 57 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/ValidatorUtil.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/ValidatorUtil.scala index f8b546332c2..0d85a512d7e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/ValidatorUtil.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/ValidatorUtil.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.catalyst.xml +import java.io.{File, FileInputStream, InputStream} import javax.xml.XMLConstants import javax.xml.transform.stream.StreamSource import javax.xml.validation.{Schema, SchemaFactory} @@ -25,28 +26,18 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkFiles import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.util.Utils +import org.apache.spark.internal.Logging /** * Utilities for working with XSD validation. */ -private[sql] object ValidatorUtil { +private[sql] object ValidatorUtil extends Logging{ // Parsing XSDs may be slow, so cache them by path: private val cache = CacheBuilder.newBuilder().softValues().build( new CacheLoader[String, Schema] { override def load(key: String): Schema = { - val in = try { - // Handle case where file exists as specified - val fs = Utils.getHadoopFileSystem(key, SparkHadoopUtil.get.conf) - fs.open(new Path(key)) - } catch { - case _: Throwable => - // Handle case where it was added with sc.addFile - val addFileUrl = SparkFiles.get(key) - val fs = Utils.getHadoopFileSystem(addFileUrl, SparkHadoopUtil.get.conf) - fs.open(new Path(addFileUrl)) - } + val in = openSchemaFile(new Path(key)) try { val schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI) schemaFactory.newSchema(new StreamSource(in)) @@ -56,6 +47,25 @@ private[sql] object ValidatorUtil { } }) + def openSchemaFile(xsdPath: Path): InputStream = { + try { + // Handle case where file exists as specified + val fs = xsdPath.getFileSystem(SparkHadoopUtil.get.conf) + fs.open(xsdPath) + } catch { + case e: Throwable => + // Handle case where it was added with sc.addFile + // When they are added via sc.addFile, they are always downloaded to local file system + logInfo(s"$xsdPath was not found, falling back to look up files added by Spark") + val f = new File(SparkFiles.get(xsdPath.toString)) + if (f.exists()) { + new FileInputStream(f) + } else { + throw e + } + } + } + /** * Parses the XSD at the given local path and caches it. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XSDToSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XSDToSchema.scala index b0894ed3484..356ffd57698 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XSDToSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XSDToSchema.scala @@ -16,53 +16,42 @@ */ package org.apache.spark.sql.execution.datasources.xml -import java.io.{File, FileInputStream, InputStreamReader, StringReader} -import java.nio.charset.StandardCharsets -import java.nio.file.Path +import java.io.StringReader import scala.jdk.CollectionConverters._ +import org.apache.hadoop.fs.Path +import org.apache.hadoop.shaded.org.jline.utils.InputStreamReader import org.apache.ws.commons.schema._ import org.apache.ws.commons.schema.constants.Constants -import org.apache.spark.sql.catalyst.xml.XmlOptions +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.xml.{ValidatorUtil, XmlOptions} import org.apache.spark.sql.types._ /** * Utility to generate a Spark schema from an XSD. Not all XSD schemas are simple tabular schemas, * so not all elements or XSDs are supported. */ -object XSDToSchema { +object XSDToSchema extends Logging{ /** - * Reads a schema from an XSD file. + * Reads a schema from an XSD path. * Note that if the schema consists of one complex parent type which you want to use as * the row tag schema, then you will need to extract the schema of the single resulting * struct in the resulting StructType, and use its StructType as your schema. * - * @param xsdFile XSD file + * @param xsdPath XSD path * @return Spark-compatible schema */ - def read(xsdFile: File): StructType = { + def read(xsdPath: Path): StructType = { + val in = ValidatorUtil.openSchemaFile(xsdPath) val xmlSchemaCollection = new XmlSchemaCollection() - xmlSchemaCollection.setBaseUri(xsdFile.getParent) - val xmlSchema = xmlSchemaCollection.read( - new InputStreamReader(new FileInputStream(xsdFile), StandardCharsets.UTF_8)) - + xmlSchemaCollection.setBaseUri(xsdPath.getParent.toString) + val xmlSchema = xmlSchemaCollection.read(new InputStreamReader(in)) getStructType(xmlSchema) } - /** - * Reads a schema from an XSD file. - * Note that if the schema consists of one complex parent type which you want to use as - * the row tag schema, then you will need to extract the schema of the single resulting - * struct in the resulting StructType, and use its StructType as your schema. - * - * @param xsdFile XSD file - * @return Spark-compatible schema - */ - def read(xsdFile: Path): StructType = read(xsdFile.toFile) - /** * Reads a schema from an XSD as a string. * Note that if the schema consists of one complex parent type which you want to use as diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/util/XSDToSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/util/XSDToSchemaSuite.scala index 9d8b1eec8f7..434b4655d40 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/util/XSDToSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/util/XSDToSchemaSuite.scala @@ -16,7 +16,9 @@ */ package org.apache.spark.sql.execution.datasources.xml.util -import java.nio.file.Paths +import java.io.FileNotFoundException + +import org.apache.hadoop.fs.Path import org.apache.spark.sql.execution.datasources.xml.TestUtils._ import org.apache.spark.sql.execution.datasources.xml.XSDToSchema @@ -28,8 +30,7 @@ class XSDToSchemaSuite extends SharedSparkSession { private val resDir = "test-data/xml-resources/" test("Basic parsing") { - val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir + "basket.xsd") - .replace("file:/", "/"))) + val parsedSchema = XSDToSchema.read(new Path(testFile(resDir + "basket.xsd"))) val expectedSchema = buildSchema( field("basket", structField( @@ -40,8 +41,7 @@ class XSDToSchemaSuite extends SharedSparkSession { } test("Relative path parsing") { - val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir + "include-example/first.xsd") - .replace("file:/", "/"))) + val parsedSchema = XSDToSchema.read(new Path(testFile(resDir + "include-example/first.xsd"))) val expectedSchema = buildSchema( field("basket", structField( @@ -52,8 +52,7 @@ class XSDToSchemaSuite extends SharedSparkSession { } test("Test schema types and attributes") { - val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir + "catalog.xsd") - .replace("file:/", "/"))) + val parsedSchema = XSDToSchema.read(new Path(testFile(resDir + "catalog.xsd"))) val expectedSchema = buildSchema( field("catalog", structField( @@ -76,23 +75,20 @@ class XSDToSchemaSuite extends SharedSparkSession { } test("Test xs:choice nullability") { - val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir + "choice.xsd") - .replace("file:/", "/"))) + val parsedSchema = XSDToSchema.read(new Path(testFile(resDir + "choice.xsd"))) val expectedSchema = buildSchema( field("el", structField(field("foo"), field("bar"), field("baz")), nullable = false)) assert(expectedSchema === parsedSchema) } test("Two root elements") { - val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir + "twoelements.xsd") - .replace("file:/", "/"))) + val parsedSchema = XSDToSchema.read(new Path(testFile(resDir + "twoelements.xsd"))) val expectedSchema = buildSchema(field("bar", nullable = false), field("foo", nullable = false)) assert(expectedSchema === parsedSchema) } test("xs:any schema") { - val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir + "xsany.xsd") - .replace("file:/", "/"))) + val parsedSchema = XSDToSchema.read(new Path(testFile(resDir + "xsany.xsd"))) val expectedSchema = buildSchema( field("root", structField( @@ -117,8 +113,7 @@ class XSDToSchemaSuite extends SharedSparkSession { } test("Tests xs:long type / Issue 520") { - val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir + "long.xsd") - .replace("file:/", "/"))) + val parsedSchema = XSDToSchema.read(new Path(testFile(resDir + "long.xsd"))) val expectedSchema = buildSchema( field("test", structField(field("userId", LongType, nullable = false)), nullable = false)) @@ -126,8 +121,7 @@ class XSDToSchemaSuite extends SharedSparkSession { } test("Test xs:decimal type with restriction[fractionalDigits]") { - val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir + - "decimal-with-restriction.xsd").replace("file:/", "/"))) + val parsedSchema = XSDToSchema.read(new Path(testFile(resDir + "decimal-with-restriction.xsd"))) val expectedSchema = buildSchema( field("decimal_type_3", DecimalType(12, 6), nullable = false), field("decimal_type_1", DecimalType(38, 18), nullable = false), @@ -137,8 +131,7 @@ class XSDToSchemaSuite extends SharedSparkSession { } test("Test ref attribute / Issue 617") { - val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir + "ref-attribute.xsd") - .replace("file:/", "/"))) + val parsedSchema = XSDToSchema.read(new Path(testFile(resDir + "ref-attribute.xsd"))) val expectedSchema = buildSchema( field( "book", @@ -166,8 +159,8 @@ class XSDToSchemaSuite extends SharedSparkSession { } test("Test complex content with extension element / Issue 554") { - val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir + - "complex-content-extension.xsd").replace("file:/", "/"))) + val parsedSchema = + XSDToSchema.read(new Path(testFile(resDir + "complex-content-extension.xsd"))) val expectedSchema = buildSchema( field( @@ -184,4 +177,10 @@ class XSDToSchemaSuite extends SharedSparkSession { ) assert(parsedSchema === expectedSchema) } + + test("SPARK-45912: Test XSDToSchema when open not found files") { + intercept[FileNotFoundException] { + XSDToSchema.read(new Path("/path/not/found")) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org