This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 89041a4a8c7 [SPARK-44788][CONNECT][PYTHON][SQL] Add from_xml and 
schema_of_xml to pyspark, spark connect and sql function
89041a4a8c7 is described below

commit 89041a4a8c7b7787fa10f090d4324f20447c4dd3
Author: Sandip Agarwala <131817656+sandip...@users.noreply.github.com>
AuthorDate: Mon Sep 18 16:35:22 2023 +0900

    [SPARK-44788][CONNECT][PYTHON][SQL] Add from_xml and schema_of_xml to 
pyspark, spark connect and sql function
    
    ### What changes were proposed in this pull request?
    Add from_xml and schema_of_xml to pyspark, spark connect and sql function
    
    ### Why are the changes needed?
    from_xml parses XML data nested in a `Column` into a struct. schema_of_xml 
infers schema from XML data in a `Column`.
    This PR adds these two functions to pyspark, spark connect and SQL function 
registry.
    It is one of the series of PR to add native support for [XML File 
Format](https://issues.apache.org/jira/browse/SPARK-44265) in spark.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, it adds from_xml and schema_of_xml to pyspark, spark connect and sql 
function
    
    ### How was this patch tested?
    - Added new unit tests
    - Github Action
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #42938 from sandip-db/from_xml-master.
    
    Authored-by: Sandip Agarwala <131817656+sandip...@users.noreply.github.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../scala/org/apache/spark/sql/functions.scala     |  51 +++-
 .../org/apache/spark/sql/FunctionTestSuite.scala   |  10 +
 python/pyspark/errors/error_classes.py             |   5 +
 python/pyspark/sql/connect/functions.py            |  46 +++
 python/pyspark/sql/functions.py                    | 129 +++++++++
 .../sql/tests/connect/test_connect_function.py     | 106 +++++++
 python/pyspark/sql/tests/test_functions.py         |  28 +-
 .../sql/catalyst/analysis/FunctionRegistry.scala   |   6 +-
 .../sql/catalyst/expressions/xmlExpressions.scala  |  26 +-
 .../scala/org/apache/spark/sql/functions.scala     |  60 +++-
 .../sql-functions/sql-expression-schema.md         |   4 +-
 .../analyzer-results/xml-functions.sql.out         | 271 +++++++++++++++++
 .../resources/sql-tests/inputs/xml-functions.sql   |  50 ++++
 .../sql-tests/results/xml-functions.sql.out        | 319 +++++++++++++++++++++
 .../apache/spark/sql/DataFrameFunctionsSuite.scala |   5 +-
 .../sql/execution/datasources/xml/XmlSuite.scala   |   2 +-
 16 files changed, 1093 insertions(+), 25 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
index 83f0ee64501..b94a33007b1 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.api.java._
 import 
org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.PrimitiveLongEncoder
 import org.apache.spark.sql.connect.common.LiteralValueProtoConverter._
 import org.apache.spark.sql.connect.common.UdfUtils
+import org.apache.spark.sql.errors.DataTypeErrors
 import org.apache.spark.sql.expressions.{ScalarUserDefinedFunction, 
UserDefinedFunction}
 import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.sql.types.DataType.parseTypeWithFallback
@@ -7311,15 +7312,59 @@ object functions {
    *   
"https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option";>
 Data
    *   Source Option</a> in the version you use.
    * @group collection_funcs
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: StructType, options: java.util.Map[String, 
String]): Column =
+    from_xml(e, lit(schema.json), options.asScala.toIterator)
+
+  // scalastyle:off line.size.limit
+
+  /**
+   * (Java-specific) Parses a column containing a XML string into a 
`StructType` with the
+   * specified schema. Returns `null`, in the case of an unparseable string.
    *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema as a DDL-formatted string.
+   * @param options
+   *   options to control how the XML is parsed. accepts the same options and 
the xml data source.
+   *   See <a href=
+   *   
"https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option";>
 Data
+   *   Source Option</a> in the version you use.
+   * @group collection_funcs
    * @since 4.0.0
    */
   // scalastyle:on line.size.limit
-  def from_xml(e: Column, schema: StructType, options: Map[String, String]): 
Column =
-    from_xml(e, lit(schema.toDDL), options.iterator)
+  def from_xml(e: Column, schema: String, options: java.util.Map[String, 
String]): Column = {
+    val dataType =
+      parseTypeWithFallback(schema, DataType.fromJson, fallbackParser = 
DataType.fromDDL)
+    val structType = dataType match {
+      case t: StructType => t
+      case _ => throw DataTypeErrors.failedParsingStructTypeError(schema)
+    }
+    from_xml(e, structType, options)
+  }
 
   // scalastyle:off line.size.limit
+  /**
+   * (Java-specific) Parses a column containing a XML string into a 
`StructType` with the
+   * specified schema. Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the XML string
+   * @group collection_funcs
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: Column): Column = {
+    from_xml(e, schema, Iterator.empty)
+  }
 
+  // scalastyle:off line.size.limit
   /**
    * (Java-specific) Parses a column containing a XML string into the data 
type corresponding to
    * the specified schema. Returns `null`, in the case of an unparseable 
string.
@@ -7354,7 +7399,7 @@ object functions {
    * @since 4.0.0
    */
   def from_xml(e: Column, schema: StructType): Column =
-    from_xml(e, schema, Map.empty[String, String])
+    from_xml(e, schema, Map.empty[String, String].asJava)
 
   private def from_xml(e: Column, schema: Column, options: Iterator[(String, 
String)]): Column = {
     fnWithOptions("from_xml", options, e, schema)
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala
index 78cc26d627c..1240fbc9ada 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala
@@ -229,6 +229,16 @@ class FunctionTestSuite extends ConnectFunSuite {
     schema_of_csv("x,y"),
     schema_of_csv(lit("x,y"), Collections.emptyMap()))
   testEquals("to_csv", to_csv(a), to_csv(a, Collections.emptyMap[String, 
String]))
+  testEquals(
+    "from_xml",
+    from_xml(a, schema),
+    from_xml(a, lit(schema.json)),
+    from_xml(a, schema.json, Collections.emptyMap[String, String]),
+    from_xml(a, schema.json, Map.empty[String, String].asJava),
+    from_xml(a, schema, Map.empty[String, String].asJava),
+    from_xml(a, schema, Collections.emptyMap[String, String]),
+    from_xml(a, lit(schema.json), Collections.emptyMap[String, String]))
+
   testEquals(
     "from_avro",
     avroFn.from_avro(a, """{"type": "int", "name": "id"}"""),
diff --git a/python/pyspark/errors/error_classes.py 
b/python/pyspark/errors/error_classes.py
index c98e9feb610..eaefbf4d576 100644
--- a/python/pyspark/errors/error_classes.py
+++ b/python/pyspark/errors/error_classes.py
@@ -477,6 +477,11 @@ ERROR_CLASSES_JSON = """
       "Argument `<arg_name>` should be a Column or str, got <arg_type>."
     ]
   },
+  "NOT_COLUMN_OR_STR_OR_STRUCT" : {
+    "message" : [
+      "Argument `<arg_name>` should be a StructType, Column or str, got 
<arg_type>."
+    ]
+  },
   "NOT_DATAFRAME" : {
     "message" : [
       "Argument `<arg_name>` should be a DataFrame, got <arg_type>."
diff --git a/python/pyspark/sql/connect/functions.py 
b/python/pyspark/sql/connect/functions.py
index f89b1aae500..24b552a45e6 100644
--- a/python/pyspark/sql/connect/functions.py
+++ b/python/pyspark/sql/connect/functions.py
@@ -1855,6 +1855,32 @@ def from_json(
 from_json.__doc__ = pysparkfuncs.from_json.__doc__
 
 
+def from_xml(
+    col: "ColumnOrName",
+    schema: Union[StructType, Column, str],
+    options: Optional[Dict[str, str]] = None,
+) -> Column:
+    if isinstance(schema, Column):
+        _schema = schema
+    elif isinstance(schema, StructType):
+        _schema = lit(schema.json())
+    elif isinstance(schema, str):
+        _schema = lit(schema)
+    else:
+        raise PySparkTypeError(
+            error_class="NOT_COLUMN_OR_STR_OR_STRUCT",
+            message_parameters={"arg_name": "schema", "arg_type": 
type(schema).__name__},
+        )
+
+    if options is None:
+        return _invoke_function("from_xml", _to_col(col), _schema)
+    else:
+        return _invoke_function("from_xml", _to_col(col), _schema, 
_options_to_col(options))
+
+
+from_xml.__doc__ = pysparkfuncs.from_xml.__doc__
+
+
 def get(col: "ColumnOrName", index: Union["ColumnOrName", int]) -> Column:
     index = lit(index) if isinstance(index, int) else index
 
@@ -2064,6 +2090,26 @@ def schema_of_json(json: "ColumnOrName", options: 
Optional[Dict[str, str]] = Non
 schema_of_json.__doc__ = pysparkfuncs.schema_of_json.__doc__
 
 
+def schema_of_xml(xml: "ColumnOrName", options: Optional[Dict[str, str]] = 
None) -> Column:
+    if isinstance(xml, Column):
+        _xml = xml
+    elif isinstance(xml, str):
+        _xml = lit(xml)
+    else:
+        raise PySparkTypeError(
+            error_class="NOT_COLUMN_OR_STR",
+            message_parameters={"arg_name": "xml", "arg_type": 
type(xml).__name__},
+        )
+
+    if options is None:
+        return _invoke_function("schema_of_xml", _xml)
+    else:
+        return _invoke_function("schema_of_xml", _xml, 
_options_to_col(options))
+
+
+schema_of_xml.__doc__ = pysparkfuncs.schema_of_xml.__doc__
+
+
 def shuffle(col: "ColumnOrName") -> Column:
     return _invoke_function_over_columns("shuffle", col)
 
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 1e12b9bf469..3c65e8d9162 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -13058,6 +13058,135 @@ def json_object_keys(col: "ColumnOrName") -> Column:
     return _invoke_function_over_columns("json_object_keys", col)
 
 
+@_try_remote_functions
+def from_xml(
+    col: "ColumnOrName",
+    schema: Union[StructType, Column, str],
+    options: Optional[Dict[str, str]] = None,
+) -> Column:
+    """
+    Parses a column containing a XML string to a row with
+    the specified schema. Returns `null`, in the case of an unparseable string.
+
+    .. versionadded:: 4.0.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+        a column or column name in XML format
+    schema : :class:`StructType`, :class:`~pyspark.sql.Column` or str
+        a StructType, Column or Python string literal with a DDL-formatted 
string
+        to use when parsing the Xml column
+    options : dict, optional
+        options to control parsing. accepts the same options as the Xml 
datasource.
+        See `Data Source Option 
<https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option>`_
+        for the version you use.
+
+        .. # noqa
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        a new column of complex type from given XML object.
+
+    Examples
+    --------
+    >>> from pyspark.sql.types import *
+    >>> from pyspark.sql.functions import from_xml, schema_of_xml, lit
+
+    StructType input with simple IntegerType.
+
+    >>> data = [(1, '''<p><a>1</a></p>''')]
+    >>> df = spark.createDataFrame(data, ("key", "value"))
+
+    TODO: Fix StructType for spark connect
+    schema = StructType([StructField("a", IntegerType())])
+
+    >>> schema = "STRUCT<a: BIGINT>"
+    >>> df.select(from_xml(df.value, schema).alias("xml")).collect()
+    [Row(xml=Row(a=1))]
+
+    String input.
+
+    >>> df.select(from_xml(df.value, "a INT").alias("xml")).collect()
+    [Row(xml=Row(a=1))]
+
+    >>> data = [(1, '<p><a>1</a><a>2</a></p>')]
+    >>> df = spark.createDataFrame(data, ("key", "value"))
+
+    TODO: Fix StructType for spark connect
+    schema = StructType([StructField("a", ArrayType(IntegerType()))])
+
+    >>> schema = "STRUCT<a: ARRAY<BIGINT>>"
+    >>> df.select(from_xml(df.value, schema).alias("xml")).collect()
+    [Row(xml=Row(a=[1, 2]))]
+
+    Column input generated by schema_of_xml.
+
+    >>> schema = schema_of_xml(lit(data[0][1]))
+    >>> df.select(from_xml(df.value, schema).alias("xml")).collect()
+    [Row(xml=Row(a=[1, 2]))]
+    """
+
+    if isinstance(schema, StructType):
+        schema = schema.json()
+    elif isinstance(schema, Column):
+        schema = _to_java_column(schema)
+    elif not isinstance(schema, str):
+        raise PySparkTypeError(
+            error_class="NOT_COLUMN_OR_STR_OR_STRUCT",
+            message_parameters={"arg_name": "schema", "arg_type": 
type(schema).__name__},
+        )
+    return _invoke_function("from_xml", _to_java_column(col), schema, 
_options_to_str(options))
+
+
+@_try_remote_functions
+def schema_of_xml(xml: "ColumnOrName", options: Optional[Dict[str, str]] = 
None) -> Column:
+    """
+    Parses a XML string and infers its schema in DDL format.
+
+    .. versionadded:: 4.0.0
+
+    Parameters
+    ----------
+    xml : :class:`~pyspark.sql.Column` or str
+        a XML string or a foldable string column containing a XML string.
+    options : dict, optional
+        options to control parsing. accepts the same options as the XML 
datasource.
+        See `Data Source Option 
<https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option>`_
+        for the version you use.
+
+        .. # noqa
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        a string representation of a :class:`StructType` parsed from given XML.
+
+    Examples
+    --------
+    >>> df = spark.range(1)
+    >>> df.select(schema_of_xml(lit('<p><a>1</a></p>')).alias("xml")).collect()
+    [Row(xml='STRUCT<a: BIGINT>')]
+    >>> 
df.select(schema_of_xml(lit('<p><a>1</a><a>2</a></p>')).alias("xml")).collect()
+    [Row(xml='STRUCT<a: ARRAY<BIGINT>>')]
+    >>> schema = schema_of_xml('<p><a attr="2">1</a></p>', 
{'excludeAttribute':'true'})
+    >>> df.select(schema.alias("xml")).collect()
+    [Row(xml='STRUCT<a: BIGINT>')]
+    """
+    if isinstance(xml, str):
+        col = _create_column_from_literal(xml)
+    elif isinstance(xml, Column):
+        col = _to_java_column(xml)
+    else:
+        raise PySparkTypeError(
+            error_class="NOT_COLUMN_OR_STR",
+            message_parameters={"arg_name": "xml", "arg_type": 
type(xml).__name__},
+        )
+
+    return _invoke_function("schema_of_xml", col, _options_to_str(options))
+
+
 @_try_remote_functions
 def schema_of_csv(csv: "ColumnOrName", options: Optional[Dict[str, str]] = 
None) -> Column:
     """
diff --git a/python/pyspark/sql/tests/connect/test_connect_function.py 
b/python/pyspark/sql/tests/connect/test_connect_function.py
index f5aaf535de7..f958b5fb574 100644
--- a/python/pyspark/sql/tests/connect/test_connect_function.py
+++ b/python/pyspark/sql/tests/connect/test_connect_function.py
@@ -1817,6 +1817,112 @@ class SparkConnectFunctionTests(ReusedConnectTestCase, 
PandasOnSparkTestUtils, S
             sdf.select(SF.to_json(SF.struct(SF.lit("a"), SF.lit("b")), 
{"mode": "FAILFAST"})),
         )
 
+    def test_xml_functions(self):
+        query = """
+            SELECT * FROM VALUES
+            ('<p><a>1</a></p>', '<p><a>1</a><a>2</a><a>3</a></p>',
+            '<p><a attr="s"><b>5.0</b></a></p>'),
+            ('<p><a>0</a></p>', '<p><a>4</a><a>5</a><a>6</a></p>', '<p><a 
attr="t"></a></p>')
+            AS tab(a, b, c)
+            """
+        # 
+---------------+-------------------------------+---------------------------------+
+        # |              a|                              b|                    
            c|
+        # 
+---------------+-------------------------------+---------------------------------+
+        # |<p><a>1</a></p>|<p><a>1</a><a>2</a><a>3</a></p>|<p><a 
attr="s"><b>5.0</b></a></p>|
+        # |<p><a>1</a></p>|<p><a>4</a><a>5</a><a>6</a></p>|          <p><a 
attr="t"></a></p>|
+        # 
+---------------+-------------------------------+---------------------------------+
+
+        cdf = self.connect.sql(query)
+        sdf = self.spark.sql(query)
+
+        # test from_xml
+        # TODO(SPARK-45190): Address StructType schema parse error
+        for schema in [
+            "a INT",
+            # StructType([StructField("a", IntegerType())]),
+            # StructType([StructField("a", ArrayType(IntegerType()))]),
+        ]:
+            self.compare_by_show(
+                cdf.select(CF.from_xml(cdf.a, schema)),
+                sdf.select(SF.from_xml(sdf.a, schema)),
+            )
+            self.compare_by_show(
+                cdf.select(CF.from_xml("a", schema)),
+                sdf.select(SF.from_xml("a", schema)),
+            )
+            self.compare_by_show(
+                cdf.select(CF.from_xml(cdf.a, schema, {"mode": "FAILFAST"})),
+                sdf.select(SF.from_xml(sdf.a, schema, {"mode": "FAILFAST"})),
+            )
+            self.compare_by_show(
+                cdf.select(CF.from_xml("a", schema, {"mode": "FAILFAST"})),
+                sdf.select(SF.from_xml("a", schema, {"mode": "FAILFAST"})),
+            )
+
+        for schema in [
+            "STRUCT<a: ARRAY<INT>>",
+            # StructType([StructField("a", ArrayType(IntegerType()))]),
+        ]:
+            self.compare_by_show(
+                cdf.select(CF.from_xml(cdf.b, schema)),
+                sdf.select(SF.from_xml(sdf.b, schema)),
+            )
+            self.compare_by_show(
+                cdf.select(CF.from_xml("b", schema)),
+                sdf.select(SF.from_xml("b", schema)),
+            )
+            self.compare_by_show(
+                cdf.select(CF.from_xml(cdf.b, schema, {"mode": "FAILFAST"})),
+                sdf.select(SF.from_xml(sdf.b, schema, {"mode": "FAILFAST"})),
+            )
+            self.compare_by_show(
+                cdf.select(CF.from_xml("b", schema, {"mode": "FAILFAST"})),
+                sdf.select(SF.from_xml("b", schema, {"mode": "FAILFAST"})),
+            )
+
+        c_schema = CF.schema_of_xml(CF.lit("""<p><a>1</a></p>"""))
+        s_schema = SF.schema_of_xml(SF.lit("""<p><a>1</a></p>"""))
+
+        self.compare_by_show(
+            cdf.select(CF.from_xml(cdf.a, c_schema)),
+            sdf.select(SF.from_xml(sdf.a, s_schema)),
+        )
+        self.compare_by_show(
+            cdf.select(CF.from_xml("a", c_schema)),
+            sdf.select(SF.from_xml("a", s_schema)),
+        )
+        self.compare_by_show(
+            cdf.select(CF.from_xml(cdf.a, c_schema, {"mode": "FAILFAST"})),
+            sdf.select(SF.from_xml(sdf.a, s_schema, {"mode": "FAILFAST"})),
+        )
+        self.compare_by_show(
+            cdf.select(CF.from_xml("a", c_schema, {"mode": "FAILFAST"})),
+            sdf.select(SF.from_xml("a", s_schema, {"mode": "FAILFAST"})),
+        )
+
+        with self.assertRaises(PySparkTypeError) as pe:
+            CF.from_xml("a", [c_schema])
+
+        self.check_error(
+            exception=pe.exception,
+            error_class="NOT_COLUMN_OR_STR_OR_STRUCT",
+            message_parameters={"arg_name": "schema", "arg_type": "list"},
+        )
+
+        # test schema_of_xml
+        self.assert_eq(
+            cdf.select(CF.schema_of_xml(CF.lit("<p><a>1</a></p>"))).toPandas(),
+            sdf.select(SF.schema_of_xml(SF.lit("<p><a>1</a></p>"))).toPandas(),
+        )
+        self.assert_eq(
+            cdf.select(
+                CF.schema_of_xml(CF.lit("<p><a>1</a></p>"), {"mode": 
"FAILFAST"})
+            ).toPandas(),
+            sdf.select(
+                SF.schema_of_xml(SF.lit("<p><a>1</a></p>"), {"mode": 
"FAILFAST"})
+            ).toPandas(),
+        )
+
     def test_string_functions_one_arg(self):
         query = """
             SELECT * FROM VALUES
diff --git a/python/pyspark/sql/tests/test_functions.py 
b/python/pyspark/sql/tests/test_functions.py
index e643a6c07fa..b0ad311d733 100644
--- a/python/pyspark/sql/tests/test_functions.py
+++ b/python/pyspark/sql/tests/test_functions.py
@@ -82,12 +82,7 @@ class FunctionsTestsMixin:
         missing_in_py = jvm_fn_set.difference(py_fn_set)
 
         # Functions that we expect to be missing in python until they are 
added to pyspark
-        expected_missing_in_py = {
-            # TODO: XML functions will soon be added and removed from this list
-            # https://issues.apache.org/jira/browse/SPARK-44788
-            "from_xml",
-            "schema_of_xml",
-        }
+        expected_missing_in_py = set()
 
         self.assertEqual(
             expected_missing_in_py, missing_in_py, "Missing functions in 
pyspark not as expected"
@@ -1286,6 +1281,27 @@ class FunctionsTestsMixin:
             message_parameters={"arg_name": "schema", "arg_type": "int"},
         )
 
+    def test_schema_of_xml(self):
+        with self.assertRaises(PySparkTypeError) as pe:
+            F.schema_of_xml(1)
+
+        self.check_error(
+            exception=pe.exception,
+            error_class="NOT_COLUMN_OR_STR",
+            message_parameters={"arg_name": "xml", "arg_type": "int"},
+        )
+
+    def test_from_xml(self):
+        df = self.spark.range(10)
+        with self.assertRaises(PySparkTypeError) as pe:
+            F.from_xml(df.id, 1)
+
+        self.check_error(
+            exception=pe.exception,
+            error_class="NOT_COLUMN_OR_STR_OR_STRUCT",
+            message_parameters={"arg_name": "schema", "arg_type": "int"},
+        )
+
     def test_greatest(self):
         df = self.spark.range(10)
         with self.assertRaises(PySparkValueError) as pe:
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index af9c095deb9..8be3199ef9b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -830,7 +830,11 @@ object FunctionRegistry {
     // csv
     expression[CsvToStructs]("from_csv"),
     expression[SchemaOfCsv]("schema_of_csv"),
-    expression[StructsToCsv]("to_csv")
+    expression[StructsToCsv]("to_csv"),
+
+    // Xml
+    expression[XmlToStructs]("from_xml"),
+    expression[SchemaOfXml]("schema_of_xml")
   )
 
   val builtin: SimpleFunctionRegistry = {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala
index ddc63b25903..c0fd725943d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala
@@ -28,6 +28,26 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
+/**
+ * Converts an XML input string to a [[StructType]] with the specified schema.
+ * It is assumed that the XML input string constitutes a single record; so the
+ * [[rowTag]] option will be not applicable.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(xmlStr, schema[, options]) - Returns a struct value with the 
given `xmlStr` and `schema`.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_('<p><a>1</a><b>0.8</b></p>', 'a INT, b DOUBLE');
+       {"a":1,"b":0.8}
+      > SELECT _FUNC_('<p><time>26/08/2015</time></p>', 'time Timestamp', 
map('timestampFormat', 'dd/MM/yyyy'));
+       {"time":2015-08-26 00:00:00}
+      > SELECT 
_FUNC_('<p><teacher>Alice</teacher><student><name>Bob</name><rank>1</rank></student><student><name>Charlie</name><rank>2</rank></student></p>',
 'STRUCT<teacher: STRING, student: ARRAY<STRUCT<name: STRING, rank: INT>>>');
+       
{"teacher":"Alice","student":[{"name":"Bob","rank":1},{"name":"Charlie","rank":2}]}
+  """,
+  group = "xml_funcs",
+  since = "4.0.0")
+// scalastyle:on line.size.limit
 case class XmlToStructs(
     schema: DataType,
     options: Map[String, String],
@@ -138,8 +158,10 @@ case class XmlToStructs(
   usage = "_FUNC_(xml[, options]) - Returns schema in the DDL format of XML 
string.",
   examples = """
     Examples:
-      > SELECT _FUNC_('1,abc');
-       STRUCT<_c0: INT, _c1: STRING>
+      > SELECT _FUNC_('<p><a>1</a></p>');
+       STRUCT<a: BIGINT>
+      > SELECT _FUNC_('<p><a attr="2">1</a><a>3</a></p>', 
map('excludeAttribute', 'true'));
+       STRUCT<a: ARRAY<BIGINT>>
   """,
   since = "4.0.0",
   group = "xml_funcs")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index dcde01ec408..a4b8f1b1b68 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.expressions.xml._
 import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, HintInfo, 
ResolvedHint}
 import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, 
TimestampFormatter}
-import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.errors.{DataTypeErrors, QueryCompilationErrors}
 import org.apache.spark.sql.execution.SparkSqlParser
 import org.apache.spark.sql.expressions.{Aggregator, SparkUserDefinedFunction, 
UserDefinedAggregator, UserDefinedFunction}
 import org.apache.spark.sql.internal.SQLConf
@@ -7381,15 +7381,61 @@ object functions {
    *                
"https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option";>
    *                Data Source Option</a> in the version you use.
    * @group collection_funcs
-   * @since
+   * @since 4.0.0
    */
   // scalastyle:on line.size.limit
-  def from_xml(e: Column, schema: StructType, options: Map[String, String]): 
Column = withExpr {
-    XmlToStructs(CharVarcharUtils.failIfHasCharVarchar(schema), options, 
e.expr)
+  def from_xml(e: Column, schema: StructType, options: java.util.Map[String, 
String]): Column = {
+    withExpr(XmlToStructs(CharVarcharUtils.failIfHasCharVarchar(schema),
+      options.asScala.toMap, e.expr))
+  }
+
+  // scalastyle:off line.size.limit
+  /**
+   * (Java-specific) Parses a column containing a XML string into a 
`StructType`
+   * with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e       a string column containing XML data.
+   * @param schema  the schema as a DDL-formatted string.
+   * @param options options to control how the XML is parsed. accepts the same 
options and the
+   *                xml data source.
+   *                See
+   *                <a href=
+   *                
"https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option";>
+   *                Data Source Option</a> in the version you use.
+   * @group collection_funcs
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: String, options: java.util.Map[String, 
String]): Column = {
+    val dataType = parseTypeWithFallback(
+      schema,
+      DataType.fromJson,
+      fallbackParser = DataType.fromDDL)
+    val structType = dataType match {
+      case t: StructType => t
+      case _ => throw DataTypeErrors.failedParsingStructTypeError(schema)
+    }
+    from_xml(e, structType, options)
   }
 
   // scalastyle:off line.size.limit
 
+  /**
+   * (Java-specific) Parses a column containing a XML string into a 
`StructType`
+   * with the specified schema. Returns `null`, in the case of an unparseable 
string.
+   *
+   * @param e       a string column containing XML data.
+   * @param schema  the schema to use when parsing the XML string
+   * @group collection_funcs
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: Column): Column = {
+    from_xml(e, schema, Map.empty[String, String].asJava)
+  }
+
+  // scalastyle:off line.size.limit
   /**
    * (Java-specific) Parses a column containing a XML string into a 
`StructType`
    * with the specified schema. Returns `null`, in the case of an unparseable 
string.
@@ -7403,7 +7449,7 @@ object functions {
    *                
"https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option";>
    *                Data Source Option</a> in the version you use.
    * @group collection_funcs
-   * @since
+   * @since 4.0.0
    */
   // scalastyle:on line.size.limit
   def from_xml(e: Column, schema: Column, options: java.util.Map[String, 
String]): Column = {
@@ -7419,10 +7465,10 @@ object functions {
    * @param schema  the schema to use when parsing the XML string
 
    * @group collection_funcs
-   * @since
+   * @since 4.0.0
    */
   def from_xml(e: Column, schema: StructType): Column =
-    from_xml(e, schema, Map.empty[String, String])
+    from_xml(e, schema, Map.empty[String, String].asJava)
 
   /**
    * Parses a XML string and infers its schema in DDL format.
diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md 
b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
index 9e06d5ac58a..d21ceaeb14b 100644
--- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
+++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
@@ -426,6 +426,7 @@
 | org.apache.spark.sql.catalyst.expressions.aggregate.VariancePop | var_pop | 
SELECT var_pop(col) FROM VALUES (1), (2), (3) AS tab(col) | 
struct<var_pop(col):double> |
 | org.apache.spark.sql.catalyst.expressions.aggregate.VarianceSamp | var_samp 
| SELECT var_samp(col) FROM VALUES (1), (2), (3) AS tab(col) | 
struct<var_samp(col):double> |
 | org.apache.spark.sql.catalyst.expressions.aggregate.VarianceSamp | variance 
| SELECT variance(col) FROM VALUES (1), (2), (3) AS tab(col) | 
struct<variance(col):double> |
+| org.apache.spark.sql.catalyst.expressions.xml.SchemaOfXml | schema_of_xml | 
SELECT schema_of_xml('<p><a>1</a></p>') | 
struct<schema_of_xml(<p><a>1</a></p>):string> |
 | org.apache.spark.sql.catalyst.expressions.xml.XPathBoolean | xpath_boolean | 
SELECT xpath_boolean('<a><b>1</b></a>','a/b') | 
struct<xpath_boolean(<a><b>1</b></a>, a/b):boolean> |
 | org.apache.spark.sql.catalyst.expressions.xml.XPathDouble | xpath_double | 
SELECT xpath_double('<a><b>1</b><b>2</b></a>', 'sum(a/b)') | 
struct<xpath_double(<a><b>1</b><b>2</b></a>, sum(a/b)):double> |
 | org.apache.spark.sql.catalyst.expressions.xml.XPathDouble | xpath_number | 
SELECT xpath_number('<a><b>1</b><b>2</b></a>', 'sum(a/b)') | 
struct<xpath_number(<a><b>1</b><b>2</b></a>, sum(a/b)):double> |
@@ -434,4 +435,5 @@
 | org.apache.spark.sql.catalyst.expressions.xml.XPathList | xpath | SELECT 
xpath('<a><b>b1</b><b>b2</b><b>b3</b><c>c1</c><c>c2</c></a>','a/b/text()') | 
struct<xpath(<a><b>b1</b><b>b2</b><b>b3</b><c>c1</c><c>c2</c></a>, 
a/b/text()):array<string>> |
 | org.apache.spark.sql.catalyst.expressions.xml.XPathLong | xpath_long | 
SELECT xpath_long('<a><b>1</b><b>2</b></a>', 'sum(a/b)') | 
struct<xpath_long(<a><b>1</b><b>2</b></a>, sum(a/b)):bigint> |
 | org.apache.spark.sql.catalyst.expressions.xml.XPathShort | xpath_short | 
SELECT xpath_short('<a><b>1</b><b>2</b></a>', 'sum(a/b)') | 
struct<xpath_short(<a><b>1</b><b>2</b></a>, sum(a/b)):smallint> |
-| org.apache.spark.sql.catalyst.expressions.xml.XPathString | xpath_string | 
SELECT xpath_string('<a><b>b</b><c>cc</c></a>','a/c') | 
struct<xpath_string(<a><b>b</b><c>cc</c></a>, a/c):string> |
\ No newline at end of file
+| org.apache.spark.sql.catalyst.expressions.xml.XPathString | xpath_string | 
SELECT xpath_string('<a><b>b</b><c>cc</c></a>','a/c') | 
struct<xpath_string(<a><b>b</b><c>cc</c></a>, a/c):string> |
+| org.apache.spark.sql.catalyst.expressions.xml.XmlToStructs | from_xml | 
SELECT from_xml('<p><a>1</a><b>0.8</b></p>', 'a INT, b DOUBLE') | 
struct<from_xml(<p><a>1</a><b>0.8</b></p>):struct<a:int,b:double>> |
\ No newline at end of file
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/xml-functions.sql.out 
b/sql/core/src/test/resources/sql-tests/analyzer-results/xml-functions.sql.out
new file mode 100644
index 00000000000..e62f4aab344
--- /dev/null
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/xml-functions.sql.out
@@ -0,0 +1,271 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+select from_xml('<p><a>1</a></p>', 'a INT')
+-- !query analysis
+Project [from_xml(StructField(a,IntegerType,true), <p><a>1</a></p>, 
Some(America/Los_Angeles)) AS from_xml(<p><a>1</a></p>)#x]
++- OneRowRelation
+
+
+-- !query
+select from_xml('<p><time>26/08/2015</time></p>', 'time Timestamp', 
map('timestampFormat', 'dd/MM/yyyy'))
+-- !query analysis
+Project [from_xml(StructField(time,TimestampType,true), 
(timestampFormat,dd/MM/yyyy), <p><time>26/08/2015</time></p>, 
Some(America/Los_Angeles)) AS from_xml(<p><time>26/08/2015</time></p>)#x]
++- OneRowRelation
+
+
+-- !query
+select from_xml('<p><a>1</a></p>', 1)
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "INVALID_SCHEMA.NON_STRING_LITERAL",
+  "sqlState" : "42K07",
+  "messageParameters" : {
+    "inputSchema" : "\"1\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 37,
+    "fragment" : "from_xml('<p><a>1</a></p>', 1)"
+  } ]
+}
+
+
+-- !query
+select from_xml('<p><a>1</a></p>', 'a InvalidType')
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "PARSE_SYNTAX_ERROR",
+  "sqlState" : "42601",
+  "messageParameters" : {
+    "error" : "'InvalidType'",
+    "hint" : ": extra input 'InvalidType'"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 51,
+    "fragment" : "from_xml('<p><a>1</a></p>', 'a InvalidType')"
+  } ]
+}
+
+
+-- !query
+select from_xml('<p><a>1</a></p>', 'a INT', named_struct('mode', 'PERMISSIVE'))
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "INVALID_OPTIONS.NON_MAP_FUNCTION",
+  "sqlState" : "42K06",
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 79,
+    "fragment" : "from_xml('<p><a>1</a></p>', 'a INT', named_struct('mode', 
'PERMISSIVE'))"
+  } ]
+}
+
+
+-- !query
+select from_xml('<p><a>1</a></p>', 'a INT', map('mode', 1))
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "INVALID_OPTIONS.NON_STRING_TYPE",
+  "sqlState" : "42K06",
+  "messageParameters" : {
+    "mapType" : "\"MAP<STRING, INT>\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 59,
+    "fragment" : "from_xml('<p><a>1</a></p>', 'a INT', map('mode', 1))"
+  } ]
+}
+
+
+-- !query
+select from_xml()
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "WRONG_NUM_ARGS.WITHOUT_SUGGESTION",
+  "sqlState" : "42605",
+  "messageParameters" : {
+    "actualNum" : "0",
+    "docroot" : "https://spark.apache.org/docs/latest";,
+    "expectedNum" : "[2, 3]",
+    "functionName" : "`from_xml`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 17,
+    "fragment" : "from_xml()"
+  } ]
+}
+
+
+-- !query
+DROP VIEW IF EXISTS xmlTable
+-- !query analysis
+DropTableCommand `spark_catalog`.`default`.`xmlTable`, true, true, false
+
+
+-- !query
+select from_xml('<p><a>1</a></p>', 'struct<a:array<int>>')
+-- !query analysis
+Project [from_xml(StructField(a,ArrayType(IntegerType,true),true), 
<p><a>1</a></p>, Some(America/Los_Angeles)) AS from_xml(<p><a>1</a></p>)#x]
++- OneRowRelation
+
+
+-- !query
+select from_xml('<p><a>1</a><b>"2"</b></p>', 'struct<a:int,b:string>')
+-- !query analysis
+Project [from_xml(StructField(a,IntegerType,true), 
StructField(b,StringType,true), <p><a>1</a><b>"2"</b></p>, 
Some(America/Los_Angeles)) AS from_xml(<p><a>1</a><b>"2"</b></p>)#x]
++- OneRowRelation
+
+
+-- !query
+select schema_of_xml('<p><a>1</a><b>"2"</b></p>')
+-- !query analysis
+Project [schema_of_xml(<p><a>1</a><b>"2"</b></p>) AS 
schema_of_xml(<p><a>1</a><b>"2"</b></p>)#x]
++- OneRowRelation
+
+
+-- !query
+select from_xml('<p><a>1</a><a>2</a><a>3</a></p>', 
schema_of_xml('<p><a>1</a><a>2</a></p>'))
+-- !query analysis
+Project [from_xml(StructField(a,ArrayType(LongType,true),true), 
<p><a>1</a><a>2</a><a>3</a></p>, Some(America/Los_Angeles)) AS 
from_xml(<p><a>1</a><a>2</a><a>3</a></p>)#x]
++- OneRowRelation
+
+
+-- !query
+select from_xml('<p><a>1</a><a>2</a></p>', 'struct<a:array<int>>')
+-- !query analysis
+Project [from_xml(StructField(a,ArrayType(IntegerType,true),true), 
<p><a>1</a><a>2</a></p>, Some(America/Los_Angeles)) AS 
from_xml(<p><a>1</a><a>2</a></p>)#x]
++- OneRowRelation
+
+
+-- !query
+select from_xml('<p><a>1</a><a>"2"</a></p>', 'struct<a:array<int>>')
+-- !query analysis
+Project [from_xml(StructField(a,ArrayType(IntegerType,true),true), 
<p><a>1</a><a>"2"</a></p>, Some(America/Los_Angeles)) AS 
from_xml(<p><a>1</a><a>"2"</a></p>)#x]
++- OneRowRelation
+
+
+-- !query
+select from_xml('<p><a>1</a><a></a></p>', 'struct<a:array<int>>')
+-- !query analysis
+Project [from_xml(StructField(a,ArrayType(IntegerType,true),true), 
<p><a>1</a><a></a></p>, Some(America/Los_Angeles)) AS 
from_xml(<p><a>1</a><a></a></p>)#x]
++- OneRowRelation
+
+
+-- !query
+select from_xml('<p><a attr="1"><b>2</b></a></p>', 'struct<a:map<string,int>>')
+-- !query analysis
+Project [from_xml(StructField(a,MapType(StringType,IntegerType,true),true), 
<p><a attr="1"><b>2</b></a></p>, Some(America/Los_Angeles)) AS from_xml(<p><a 
attr="1"><b>2</b></a></p>)#x]
++- OneRowRelation
+
+
+-- !query
+select from_xml('<p><d>2012-12-15</d><t>2012-12-15 15:15:15</t></p>', 'd date, 
t timestamp')
+-- !query analysis
+Project [from_xml(StructField(d,DateType,true), 
StructField(t,TimestampType,true), <p><d>2012-12-15</d><t>2012-12-15 
15:15:15</t></p>, Some(America/Los_Angeles)) AS 
from_xml(<p><d>2012-12-15</d><t>2012-12-15 15:15:15</t></p>)#x]
++- OneRowRelation
+
+
+-- !query
+select from_xml(
+  '<p><d>12/15 2012</d><t>12/15 2012 15:15:15</t>}</p>',
+  'd date, t timestamp',
+  map('dateFormat', 'MM/dd yyyy', 'timestampFormat', 'MM/dd yyyy HH:mm:ss'))
+-- !query analysis
+Project [from_xml(StructField(d,DateType,true), 
StructField(t,TimestampType,true), (dateFormat,MM/dd yyyy), 
(timestampFormat,MM/dd yyyy HH:mm:ss), <p><d>12/15 2012</d><t>12/15 2012 
15:15:15</t>}</p>, Some(America/Los_Angeles)) AS from_xml(<p><d>12/15 
2012</d><t>12/15 2012 15:15:15</t>}</p>)#x]
++- OneRowRelation
+
+
+-- !query
+select from_xml(
+  '<p><d>02-29</d></p>',
+  'd date',
+  map('dateFormat', 'MM-dd'))
+-- !query analysis
+Project [from_xml(StructField(d,DateType,true), (dateFormat,MM-dd), 
<p><d>02-29</d></p>, Some(America/Los_Angeles)) AS 
from_xml(<p><d>02-29</d></p>)#x]
++- OneRowRelation
+
+
+-- !query
+select from_xml(
+  '<p><t>02-29</t></p>',
+  't timestamp',
+  map('timestampFormat', 'MM-dd'))
+-- !query analysis
+Project [from_xml(StructField(t,TimestampType,true), (timestampFormat,MM-dd), 
<p><t>02-29</t></p>, Some(America/Los_Angeles)) AS 
from_xml(<p><t>02-29</t></p>)#x]
++- OneRowRelation
+
+
+-- !query
+select schema_of_xml(null)
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_NULL",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "exprName" : "xml",
+    "sqlExpr" : "\"schema_of_xml(NULL)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 26,
+    "fragment" : "schema_of_xml(null)"
+  } ]
+}
+
+
+-- !query
+CREATE TEMPORARY VIEW xmlTable(xmlField, a) AS SELECT * FROM VALUES 
('<p><a>1</a><b>"2"</b></p>', 'a')
+-- !query analysis
+CreateViewCommand `xmlTable`, [(xmlField,None), (a,None)], SELECT * FROM 
VALUES ('<p><a>1</a><b>"2"</b></p>', 'a'), false, false, LocalTempView, true
+   +- Project [col1#x, col2#x]
+      +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+SELECT schema_of_xml(xmlField) FROM xmlTable
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.NON_FOLDABLE_INPUT",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "inputExpr" : "\"xmlField\"",
+    "inputName" : "`xml`",
+    "inputType" : "\"STRING\"",
+    "sqlExpr" : "\"schema_of_xml(xmlField)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 30,
+    "fragment" : "schema_of_xml(xmlField)"
+  } ]
+}
+
+
+-- !query
+DROP VIEW IF EXISTS xmlTable
+-- !query analysis
+DropTempViewCommand xmlTable
diff --git a/sql/core/src/test/resources/sql-tests/inputs/xml-functions.sql 
b/sql/core/src/test/resources/sql-tests/inputs/xml-functions.sql
new file mode 100644
index 00000000000..cdf56712b11
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/xml-functions.sql
@@ -0,0 +1,50 @@
+-- from_json
+select from_xml('<p><a>1</a></p>', 'a INT');
+select from_xml('<p><time>26/08/2015</time></p>', 'time Timestamp', 
map('timestampFormat', 'dd/MM/yyyy'));
+-- Check if errors handled
+select from_xml('<p><a>1</a></p>', 1);
+select from_xml('<p><a>1</a></p>', 'a InvalidType');
+select from_xml('<p><a>1</a></p>', 'a INT', named_struct('mode', 
'PERMISSIVE'));
+select from_xml('<p><a>1</a></p>', 'a INT', map('mode', 1));
+select from_xml();
+
+-- Clean up
+DROP VIEW IF EXISTS xmlTable;
+
+-- from_json - complex types
+select from_xml('<p><a>1</a></p>', 'struct<a:array<int>>');
+select from_xml('<p><a>1</a><b>"2"</b></p>', 'struct<a:int,b:string>');
+
+-- infer schema of json literal
+select schema_of_xml('<p><a>1</a><b>"2"</b></p>');
+select from_xml('<p><a>1</a><a>2</a><a>3</a></p>', 
schema_of_xml('<p><a>1</a><a>2</a></p>'));
+
+-- from_json - array type
+select from_xml('<p><a>1</a><a>2</a></p>', 'struct<a:array<int>>');
+select from_xml('<p><a>1</a><a>"2"</a></p>', 'struct<a:array<int>>');
+select from_xml('<p><a>1</a><a></a></p>', 'struct<a:array<int>>');
+
+select from_xml('<p><a attr="1"><b>2</b></a></p>', 
'struct<a:map<string,int>>');
+
+-- from_xml - datetime type
+select from_xml('<p><d>2012-12-15</d><t>2012-12-15 15:15:15</t></p>', 'd date, 
t timestamp');
+select from_xml(
+  '<p><d>12/15 2012</d><t>12/15 2012 15:15:15</t>}</p>',
+  'd date, t timestamp',
+  map('dateFormat', 'MM/dd yyyy', 'timestampFormat', 'MM/dd yyyy HH:mm:ss'));
+select from_xml(
+  '<p><d>02-29</d></p>',
+  'd date',
+  map('dateFormat', 'MM-dd'));
+select from_xml(
+  '<p><t>02-29</t></p>',
+  't timestamp',
+  map('timestampFormat', 'MM-dd'));
+
+-- infer schema of xml literal with options
+select schema_of_xml(null);
+CREATE TEMPORARY VIEW xmlTable(xmlField, a) AS SELECT * FROM VALUES 
('<p><a>1</a><b>"2"</b></p>', 'a');
+SELECT schema_of_xml(xmlField) FROM xmlTable;
+
+-- Clean up
+DROP VIEW IF EXISTS xmlTable;
diff --git 
a/sql/core/src/test/resources/sql-tests/results/xml-functions.sql.out 
b/sql/core/src/test/resources/sql-tests/results/xml-functions.sql.out
new file mode 100644
index 00000000000..61e8e9c8662
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/results/xml-functions.sql.out
@@ -0,0 +1,319 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+select from_xml('<p><a>1</a></p>', 'a INT')
+-- !query schema
+struct<from_xml(<p><a>1</a></p>):struct<a:int>>
+-- !query output
+{"a":1}
+
+
+-- !query
+select from_xml('<p><time>26/08/2015</time></p>', 'time Timestamp', 
map('timestampFormat', 'dd/MM/yyyy'))
+-- !query schema
+struct<from_xml(<p><time>26/08/2015</time></p>):struct<time:timestamp>>
+-- !query output
+{"time":2015-08-26 00:00:00}
+
+
+-- !query
+select from_xml('<p><a>1</a></p>', 1)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "INVALID_SCHEMA.NON_STRING_LITERAL",
+  "sqlState" : "42K07",
+  "messageParameters" : {
+    "inputSchema" : "\"1\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 37,
+    "fragment" : "from_xml('<p><a>1</a></p>', 1)"
+  } ]
+}
+
+
+-- !query
+select from_xml('<p><a>1</a></p>', 'a InvalidType')
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "PARSE_SYNTAX_ERROR",
+  "sqlState" : "42601",
+  "messageParameters" : {
+    "error" : "'InvalidType'",
+    "hint" : ": extra input 'InvalidType'"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 51,
+    "fragment" : "from_xml('<p><a>1</a></p>', 'a InvalidType')"
+  } ]
+}
+
+
+-- !query
+select from_xml('<p><a>1</a></p>', 'a INT', named_struct('mode', 'PERMISSIVE'))
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "INVALID_OPTIONS.NON_MAP_FUNCTION",
+  "sqlState" : "42K06",
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 79,
+    "fragment" : "from_xml('<p><a>1</a></p>', 'a INT', named_struct('mode', 
'PERMISSIVE'))"
+  } ]
+}
+
+
+-- !query
+select from_xml('<p><a>1</a></p>', 'a INT', map('mode', 1))
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "INVALID_OPTIONS.NON_STRING_TYPE",
+  "sqlState" : "42K06",
+  "messageParameters" : {
+    "mapType" : "\"MAP<STRING, INT>\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 59,
+    "fragment" : "from_xml('<p><a>1</a></p>', 'a INT', map('mode', 1))"
+  } ]
+}
+
+
+-- !query
+select from_xml()
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "WRONG_NUM_ARGS.WITHOUT_SUGGESTION",
+  "sqlState" : "42605",
+  "messageParameters" : {
+    "actualNum" : "0",
+    "docroot" : "https://spark.apache.org/docs/latest";,
+    "expectedNum" : "[2, 3]",
+    "functionName" : "`from_xml`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 17,
+    "fragment" : "from_xml()"
+  } ]
+}
+
+
+-- !query
+DROP VIEW IF EXISTS xmlTable
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+select from_xml('<p><a>1</a></p>', 'struct<a:array<int>>')
+-- !query schema
+struct<from_xml(<p><a>1</a></p>):struct<a:array<int>>>
+-- !query output
+{"a":[1]}
+
+
+-- !query
+select from_xml('<p><a>1</a><b>"2"</b></p>', 'struct<a:int,b:string>')
+-- !query schema
+struct<from_xml(<p><a>1</a><b>"2"</b></p>):struct<a:int,b:string>>
+-- !query output
+{"a":1,"b":""2""}
+
+
+-- !query
+select schema_of_xml('<p><a>1</a><b>"2"</b></p>')
+-- !query schema
+struct<schema_of_xml(<p><a>1</a><b>"2"</b></p>):string>
+-- !query output
+STRUCT<a: BIGINT, b: STRING>
+
+
+-- !query
+select from_xml('<p><a>1</a><a>2</a><a>3</a></p>', 
schema_of_xml('<p><a>1</a><a>2</a></p>'))
+-- !query schema
+struct<from_xml(<p><a>1</a><a>2</a><a>3</a></p>):struct<a:array<bigint>>>
+-- !query output
+{"a":[1,2,3]}
+
+
+-- !query
+select from_xml('<p><a>1</a><a>2</a></p>', 'struct<a:array<int>>')
+-- !query schema
+struct<from_xml(<p><a>1</a><a>2</a></p>):struct<a:array<int>>>
+-- !query output
+{"a":[1,2]}
+
+
+-- !query
+select from_xml('<p><a>1</a><a>"2"</a></p>', 'struct<a:array<int>>')
+-- !query schema
+struct<from_xml(<p><a>1</a><a>"2"</a></p>):struct<a:array<int>>>
+-- !query output
+{"a":[1]}
+
+
+-- !query
+select from_xml('<p><a>1</a><a></a></p>', 'struct<a:array<int>>')
+-- !query schema
+struct<from_xml(<p><a>1</a><a></a></p>):struct<a:array<int>>>
+-- !query output
+{"a":[1,null]}
+
+
+-- !query
+select from_xml('<p><a attr="1"><b>2</b></a></p>', 'struct<a:map<string,int>>')
+-- !query schema
+struct<from_xml(<p><a attr="1"><b>2</b></a></p>):struct<a:map<string,int>>>
+-- !query output
+{"a":{"_attr":1,"b":2}}
+
+
+-- !query
+select from_xml('<p><d>2012-12-15</d><t>2012-12-15 15:15:15</t></p>', 'd date, 
t timestamp')
+-- !query schema
+struct<from_xml(<p><d>2012-12-15</d><t>2012-12-15 
15:15:15</t></p>):struct<d:date,t:timestamp>>
+-- !query output
+{"d":2012-12-15,"t":2012-12-15 15:15:15}
+
+
+-- !query
+select from_xml(
+  '<p><d>12/15 2012</d><t>12/15 2012 15:15:15</t>}</p>',
+  'd date, t timestamp',
+  map('dateFormat', 'MM/dd yyyy', 'timestampFormat', 'MM/dd yyyy HH:mm:ss'))
+-- !query schema
+struct<from_xml(<p><d>12/15 2012</d><t>12/15 2012 
15:15:15</t>}</p>):struct<d:date,t:timestamp>>
+-- !query output
+{"d":2012-12-15,"t":2012-12-15 15:15:15}
+
+
+-- !query
+select from_xml(
+  '<p><d>02-29</d></p>',
+  'd date',
+  map('dateFormat', 'MM-dd'))
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkUpgradeException
+{
+  "errorClass" : 
"INCONSISTENT_BEHAVIOR_CROSS_VERSION.PARSE_DATETIME_BY_NEW_PARSER",
+  "sqlState" : "42K0B",
+  "messageParameters" : {
+    "config" : "\"spark.sql.legacy.timeParserPolicy\"",
+    "datetime" : "'02-29'"
+  }
+}
+
+
+-- !query
+select from_xml(
+  '<p><t>02-29</t></p>',
+  't timestamp',
+  map('timestampFormat', 'MM-dd'))
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkUpgradeException
+{
+  "errorClass" : 
"INCONSISTENT_BEHAVIOR_CROSS_VERSION.PARSE_DATETIME_BY_NEW_PARSER",
+  "sqlState" : "42K0B",
+  "messageParameters" : {
+    "config" : "\"spark.sql.legacy.timeParserPolicy\"",
+    "datetime" : "'02-29'"
+  }
+}
+
+
+-- !query
+select schema_of_xml(null)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_NULL",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "exprName" : "xml",
+    "sqlExpr" : "\"schema_of_xml(NULL)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 26,
+    "fragment" : "schema_of_xml(null)"
+  } ]
+}
+
+
+-- !query
+CREATE TEMPORARY VIEW xmlTable(xmlField, a) AS SELECT * FROM VALUES 
('<p><a>1</a><b>"2"</b></p>', 'a')
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT schema_of_xml(xmlField) FROM xmlTable
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.NON_FOLDABLE_INPUT",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "inputExpr" : "\"xmlField\"",
+    "inputName" : "`xml`",
+    "inputType" : "\"STRING\"",
+    "sqlExpr" : "\"schema_of_xml(xmlField)\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 30,
+    "fragment" : "schema_of_xml(xmlField)"
+  } ]
+}
+
+
+-- !query
+DROP VIEW IF EXISTS xmlTable
+-- !query schema
+struct<>
+-- !query output
+
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
index 8ca14385e59..7044b7c90c2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
@@ -82,10 +82,7 @@ class DataFrameFunctionsSuite extends QueryTest with 
SharedSparkSession {
       "bucket", "days", "hours", "months", "years", // Datasource v2 partition 
transformations
       "product", // Discussed in https://github.com/apache/spark/pull/30745
       "unwrap_udt",
-      "collect_top_k",
-      // TODO: XML functions will soon be added to SQL Function registry and 
removed from this list
-      // https://issues.apache.org/jira/browse/SPARK-44787
-      "from_xml", "schema_of_xml"
+      "collect_top_k"
     )
 
     // We only consider functions matching this pattern, this excludes 
symbolic and other
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala
index beea14e3cf8..b03d65219d7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala
@@ -1264,7 +1264,7 @@ class XmlSuite extends QueryTest with SharedSparkSession {
     val outputDF = Seq("0.0000", "0.01")
       .map { n => s"<Row> <Number>$n</Number> </Row>" }
       .toDF("xml")
-      .withColumn("parsed", from_xml($"xml", schema, Map("rowTag" -> "Row")))
+      .withColumn("parsed", from_xml($"xml", schema, Map("rowTag" -> 
"Row").asJava))
       .select("parsed.Number")
 
     val results = outputDF.collect()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to