This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new fcf340a1de3 [SPARK-45912][SQL] Enhancement of XSDToSchema API: Change 
to HDFS API for cloud storage accessibility
fcf340a1de3 is described below

commit fcf340a1de371ce1beb2cf93473ea2f2b793801b
Author: Shujing Yang <shujing.y...@databricks.com>
AuthorDate: Fri Nov 17 09:17:57 2023 +0900

    [SPARK-45912][SQL] Enhancement of XSDToSchema API: Change to HDFS API for 
cloud storage accessibility
    
    ### What changes were proposed in this pull request?
    
    Previously, it utilized `java.nio.path`, which limited file reading to 
local file systems only. By changing this to an HDFS-compatible API, we now 
enable the XSDToSchema function to access files in cloud storage.
    
    ### Why are the changes needed?
    
    We want to enable the XSDToSchema function to access files in cloud storage.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Unit tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #43789 from shujingyang-db/xsd_api.
    
    Authored-by: Shujing Yang <shujing.y...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../spark/sql/catalyst/xml/ValidatorUtil.scala     | 36 ++++++++++++-------
 .../execution/datasources/xml/XSDToSchema.scala    | 35 +++++++-----------
 .../datasources/xml/util/XSDToSchemaSuite.scala    | 41 +++++++++++-----------
 3 files changed, 55 insertions(+), 57 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/ValidatorUtil.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/ValidatorUtil.scala
index f8b546332c2..0d85a512d7e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/ValidatorUtil.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/ValidatorUtil.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.spark.sql.catalyst.xml
 
+import java.io.{File, FileInputStream, InputStream}
 import javax.xml.XMLConstants
 import javax.xml.transform.stream.StreamSource
 import javax.xml.validation.{Schema, SchemaFactory}
@@ -25,28 +26,18 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark.SparkFiles
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.util.Utils
+import org.apache.spark.internal.Logging
 
 /**
  * Utilities for working with XSD validation.
  */
-private[sql] object ValidatorUtil {
+private[sql] object ValidatorUtil extends Logging{
   // Parsing XSDs may be slow, so cache them by path:
 
   private val cache = CacheBuilder.newBuilder().softValues().build(
     new CacheLoader[String, Schema] {
       override def load(key: String): Schema = {
-        val in = try {
-          // Handle case where file exists as specified
-          val fs = Utils.getHadoopFileSystem(key, SparkHadoopUtil.get.conf)
-          fs.open(new Path(key))
-        } catch {
-          case _: Throwable =>
-            // Handle case where it was added with sc.addFile
-            val addFileUrl = SparkFiles.get(key)
-            val fs = Utils.getHadoopFileSystem(addFileUrl, 
SparkHadoopUtil.get.conf)
-            fs.open(new Path(addFileUrl))
-        }
+        val in = openSchemaFile(new Path(key))
         try {
           val schemaFactory = 
SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI)
           schemaFactory.newSchema(new StreamSource(in))
@@ -56,6 +47,25 @@ private[sql] object ValidatorUtil {
       }
     })
 
+  def openSchemaFile(xsdPath: Path): InputStream = {
+    try {
+      // Handle case where file exists as specified
+      val fs = xsdPath.getFileSystem(SparkHadoopUtil.get.conf)
+      fs.open(xsdPath)
+    } catch {
+      case e: Throwable =>
+        // Handle case where it was added with sc.addFile
+        // When they are added via sc.addFile, they are always downloaded to 
local file system
+        logInfo(s"$xsdPath was not found, falling back to look up files added 
by Spark")
+        val f = new File(SparkFiles.get(xsdPath.toString))
+        if (f.exists()) {
+          new FileInputStream(f)
+        } else {
+          throw e
+        }
+    }
+  }
+
   /**
    * Parses the XSD at the given local path and caches it.
    *
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XSDToSchema.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XSDToSchema.scala
index b0894ed3484..356ffd57698 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XSDToSchema.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XSDToSchema.scala
@@ -16,53 +16,42 @@
  */
 package org.apache.spark.sql.execution.datasources.xml
 
-import java.io.{File, FileInputStream, InputStreamReader, StringReader}
-import java.nio.charset.StandardCharsets
-import java.nio.file.Path
+import java.io.StringReader
 
 import scala.jdk.CollectionConverters._
 
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.shaded.org.jline.utils.InputStreamReader
 import org.apache.ws.commons.schema._
 import org.apache.ws.commons.schema.constants.Constants
 
-import org.apache.spark.sql.catalyst.xml.XmlOptions
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.xml.{ValidatorUtil, XmlOptions}
 import org.apache.spark.sql.types._
 
 /**
  * Utility to generate a Spark schema from an XSD. Not all XSD schemas are 
simple tabular schemas,
  * so not all elements or XSDs are supported.
  */
-object XSDToSchema {
+object XSDToSchema extends Logging{
 
   /**
-   * Reads a schema from an XSD file.
+   * Reads a schema from an XSD path.
    * Note that if the schema consists of one complex parent type which you 
want to use as
    * the row tag schema, then you will need to extract the schema of the 
single resulting
    * struct in the resulting StructType, and use its StructType as your schema.
    *
-   * @param xsdFile XSD file
+   * @param xsdPath XSD path
    * @return Spark-compatible schema
    */
-  def read(xsdFile: File): StructType = {
+  def read(xsdPath: Path): StructType = {
+    val in = ValidatorUtil.openSchemaFile(xsdPath)
     val xmlSchemaCollection = new XmlSchemaCollection()
-    xmlSchemaCollection.setBaseUri(xsdFile.getParent)
-    val xmlSchema = xmlSchemaCollection.read(
-      new InputStreamReader(new FileInputStream(xsdFile), 
StandardCharsets.UTF_8))
-
+    xmlSchemaCollection.setBaseUri(xsdPath.getParent.toString)
+    val xmlSchema = xmlSchemaCollection.read(new InputStreamReader(in))
     getStructType(xmlSchema)
   }
 
-  /**
-   * Reads a schema from an XSD file.
-   * Note that if the schema consists of one complex parent type which you 
want to use as
-   * the row tag schema, then you will need to extract the schema of the 
single resulting
-   * struct in the resulting StructType, and use its StructType as your schema.
-   *
-   * @param xsdFile XSD file
-   * @return Spark-compatible schema
-   */
-  def read(xsdFile: Path): StructType = read(xsdFile.toFile)
-
   /**
    * Reads a schema from an XSD as a string.
    * Note that if the schema consists of one complex parent type which you 
want to use as
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/util/XSDToSchemaSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/util/XSDToSchemaSuite.scala
index 9d8b1eec8f7..434b4655d40 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/util/XSDToSchemaSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/util/XSDToSchemaSuite.scala
@@ -16,7 +16,9 @@
  */
 package org.apache.spark.sql.execution.datasources.xml.util
 
-import java.nio.file.Paths
+import java.io.FileNotFoundException
+
+import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql.execution.datasources.xml.TestUtils._
 import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
@@ -28,8 +30,7 @@ class XSDToSchemaSuite extends SharedSparkSession {
   private val resDir = "test-data/xml-resources/"
 
   test("Basic parsing") {
-    val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir + 
"basket.xsd")
-      .replace("file:/", "/")))
+    val parsedSchema = XSDToSchema.read(new Path(testFile(resDir + 
"basket.xsd")))
     val expectedSchema = buildSchema(
       field("basket",
         structField(
@@ -40,8 +41,7 @@ class XSDToSchemaSuite extends SharedSparkSession {
   }
 
   test("Relative path parsing") {
-    val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir + 
"include-example/first.xsd")
-      .replace("file:/", "/")))
+    val parsedSchema = XSDToSchema.read(new Path(testFile(resDir + 
"include-example/first.xsd")))
     val expectedSchema = buildSchema(
       field("basket",
         structField(
@@ -52,8 +52,7 @@ class XSDToSchemaSuite extends SharedSparkSession {
   }
 
   test("Test schema types and attributes") {
-    val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir + 
"catalog.xsd")
-      .replace("file:/", "/")))
+    val parsedSchema = XSDToSchema.read(new Path(testFile(resDir + 
"catalog.xsd")))
     val expectedSchema = buildSchema(
       field("catalog",
         structField(
@@ -76,23 +75,20 @@ class XSDToSchemaSuite extends SharedSparkSession {
   }
 
   test("Test xs:choice nullability") {
-    val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir + 
"choice.xsd")
-      .replace("file:/", "/")))
+    val parsedSchema = XSDToSchema.read(new Path(testFile(resDir + 
"choice.xsd")))
     val expectedSchema = buildSchema(
       field("el", structField(field("foo"), field("bar"), field("baz")), 
nullable = false))
     assert(expectedSchema === parsedSchema)
   }
 
   test("Two root elements") {
-    val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir + 
"twoelements.xsd")
-      .replace("file:/", "/")))
+    val parsedSchema = XSDToSchema.read(new Path(testFile(resDir + 
"twoelements.xsd")))
     val expectedSchema = buildSchema(field("bar", nullable = false), 
field("foo", nullable = false))
     assert(expectedSchema === parsedSchema)
   }
 
   test("xs:any schema") {
-    val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir + 
"xsany.xsd")
-      .replace("file:/", "/")))
+    val parsedSchema = XSDToSchema.read(new Path(testFile(resDir + 
"xsany.xsd")))
     val expectedSchema = buildSchema(
       field("root",
         structField(
@@ -117,8 +113,7 @@ class XSDToSchemaSuite extends SharedSparkSession {
   }
 
   test("Tests xs:long type / Issue 520") {
-    val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir + "long.xsd")
-      .replace("file:/", "/")))
+    val parsedSchema = XSDToSchema.read(new Path(testFile(resDir + 
"long.xsd")))
     val expectedSchema = buildSchema(
       field("test",
         structField(field("userId", LongType, nullable = false)), nullable = 
false))
@@ -126,8 +121,7 @@ class XSDToSchemaSuite extends SharedSparkSession {
   }
 
   test("Test xs:decimal type with restriction[fractionalDigits]") {
-    val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir +
-      "decimal-with-restriction.xsd").replace("file:/", "/")))
+    val parsedSchema = XSDToSchema.read(new Path(testFile(resDir + 
"decimal-with-restriction.xsd")))
     val expectedSchema = buildSchema(
       field("decimal_type_3", DecimalType(12, 6), nullable = false),
       field("decimal_type_1", DecimalType(38, 18), nullable = false),
@@ -137,8 +131,7 @@ class XSDToSchemaSuite extends SharedSparkSession {
   }
 
   test("Test ref attribute / Issue 617") {
-    val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir + 
"ref-attribute.xsd")
-      .replace("file:/", "/")))
+    val parsedSchema = XSDToSchema.read(new Path(testFile(resDir + 
"ref-attribute.xsd")))
     val expectedSchema = buildSchema(
       field(
         "book",
@@ -166,8 +159,8 @@ class XSDToSchemaSuite extends SharedSparkSession {
   }
 
   test("Test complex content with extension element / Issue 554") {
-    val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir +
-      "complex-content-extension.xsd").replace("file:/", "/")))
+    val parsedSchema =
+      XSDToSchema.read(new Path(testFile(resDir + 
"complex-content-extension.xsd")))
 
     val expectedSchema = buildSchema(
       field(
@@ -184,4 +177,10 @@ class XSDToSchemaSuite extends SharedSparkSession {
     )
     assert(parsedSchema === expectedSchema)
   }
+
+  test("SPARK-45912: Test XSDToSchema when open not found files") {
+    intercept[FileNotFoundException] {
+      XSDToSchema.read(new Path("/path/not/found"))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to