This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 00d2b4fa2de [SPARK-44790][SQL] XML: to_xml implementation and bindings 
for python, connect and SQL
00d2b4fa2de is described below

commit 00d2b4fa2def948e7517bacfce7c75be6a37eb20
Author: Sandip Agarwala <131817656+sandip...@users.noreply.github.com>
AuthorDate: Mon Oct 30 14:00:31 2023 +0900

    [SPARK-44790][SQL] XML: to_xml implementation and bindings for python, 
connect and SQL
    
    ### What changes were proposed in this pull request?
    to_xml: Converts a `StructType` to a XML output string.
    Bindings for python, connect and SQL
    
    ### Why are the changes needed?
    to_xml: Converts a `StructType` to a XML output string.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, new to_xml API.
    
    ### How was this patch tested?
    New unit tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #43503 from sandip-db/to_xml.
    
    Authored-by: Sandip Agarwala <131817656+sandip...@users.noreply.github.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../scala/org/apache/spark/sql/functions.scala     |  31 +++
 .../org/apache/spark/sql/FunctionTestSuite.scala   |   6 +
 .../source/reference/pyspark.sql/functions.rst     |   1 +
 python/pyspark/sql/connect/functions.py            |  10 +
 python/pyspark/sql/functions.py                    |  36 +++
 .../sql/tests/connect/test_connect_function.py     |  14 +
 sql/catalyst/pom.xml                               |   4 +
 .../sql/catalyst/analysis/FunctionRegistry.scala   |   3 +-
 .../sql/catalyst/expressions/xmlExpressions.scala  |  90 ++++++-
 .../spark/sql/catalyst/xml/StaxXmlGenerator.scala  | 295 ++++++++++++---------
 .../apache/spark/sql/catalyst/xml/XmlOptions.scala |   5 +
 sql/core/pom.xml                                   |   4 -
 .../datasources/xml/XmlOutputWriter.scala          |  53 +---
 .../scala/org/apache/spark/sql/functions.scala     |  30 +++
 .../sql-functions/sql-expression-schema.md         |   1 +
 .../analyzer-results/xml-functions.sql.out         | 122 +++++++++
 .../resources/sql-tests/inputs/xml-functions.sql   |  18 +-
 .../sql-tests/results/xml-functions.sql.out        | 134 ++++++++++
 .../sql/execution/datasources/xml/XmlSuite.scala   |  25 ++
 19 files changed, 696 insertions(+), 186 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
index 9c5adca7e28..1c8f5993d29 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
@@ -7470,6 +7470,37 @@ object functions {
     fnWithOptions("schema_of_xml", options.asScala.iterator, xml)
   }
 
+  // scalastyle:off line.size.limit
+
+  /**
+   * (Java-specific) Converts a column containing a `StructType` into a XML 
string with the
+   * specified schema. Throws an exception, in the case of an unsupported type.
+   *
+   * @param e
+   *   a column containing a struct.
+   * @param options
+   *   options to control how the struct column is converted into a XML 
string. It accepts the
+   *   same options as the XML data source. See <a href=
+   *   
"https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option";>
 Data
+   *   Source Option</a> in the version you use.
+   * @group xml_funcs
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def to_xml(e: Column, options: java.util.Map[String, String]): Column =
+    fnWithOptions("to_xml", options.asScala.iterator, e)
+
+  /**
+   * Converts a column containing a `StructType` into a XML string with the 
specified schema.
+   * Throws an exception, in the case of an unsupported type.
+   *
+   * @param e
+   *   a column containing a struct.
+   * @group xml_funcs
+   * @since 4.0.0
+   */
+  def to_xml(e: Column): Column = to_xml(e, Collections.emptyMap())
+
   /**
    * Returns the total number of elements in the array. The function returns 
null for null input.
    *
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala
index e350bde9946..748843ec991 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala
@@ -237,6 +237,12 @@ class FunctionTestSuite extends ConnectFunSuite {
     from_xml(a, schema, Map.empty[String, String].asJava),
     from_xml(a, schema, Collections.emptyMap[String, String]),
     from_xml(a, lit(schema.json), Collections.emptyMap[String, String]))
+  testEquals(
+    "schema_of_xml",
+    schema_of_xml(lit("<p><a>1.0</a><b>test</b></p>")),
+    schema_of_xml("<p><a>1.0</a><b>test</b></p>"),
+    schema_of_xml(lit("<p><a>1.0</a><b>test</b></p>"), Collections.emptyMap()))
+  testEquals("to_xml", to_xml(a), to_xml(a, Collections.emptyMap[String, 
String]))
 
   testEquals(
     "from_avro",
diff --git a/python/docs/source/reference/pyspark.sql/functions.rst 
b/python/docs/source/reference/pyspark.sql/functions.rst
index 5e05dac7bc3..4dc10cc1556 100644
--- a/python/docs/source/reference/pyspark.sql/functions.rst
+++ b/python/docs/source/reference/pyspark.sql/functions.rst
@@ -547,6 +547,7 @@ XML Functions
 
     from_xml
     schema_of_xml
+    to_xml
     xpath
     xpath_boolean
     xpath_double
diff --git a/python/pyspark/sql/connect/functions.py 
b/python/pyspark/sql/connect/functions.py
index f065e5391fe..38eb814247c 100644
--- a/python/pyspark/sql/connect/functions.py
+++ b/python/pyspark/sql/connect/functions.py
@@ -2200,6 +2200,16 @@ def to_json(col: "ColumnOrName", options: 
Optional[Dict[str, str]] = None) -> Co
 to_json.__doc__ = pysparkfuncs.to_json.__doc__
 
 
+def to_xml(col: "ColumnOrName", options: Optional[Dict[str, str]] = None) -> 
Column:
+    if options is None:
+        return _invoke_function("to_xml", _to_col(col))
+    else:
+        return _invoke_function("to_xml", _to_col(col), 
_options_to_col(options))
+
+
+to_xml.__doc__ = pysparkfuncs.to_xml.__doc__
+
+
 def transform(
     col: "ColumnOrName",
     f: Union[Callable[[Column], Column], Callable[[Column, Column], Column]],
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 05c22685b09..869506a3558 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -13678,6 +13678,42 @@ def schema_of_xml(xml: "ColumnOrName", options: 
Optional[Dict[str, str]] = None)
     return _invoke_function("schema_of_xml", col, _options_to_str(options))
 
 
+@_try_remote_functions
+def to_xml(col: "ColumnOrName", options: Optional[Dict[str, str]] = None) -> 
Column:
+    """
+    Converts a column containing a :class:`StructType` into a XML string.
+    Throws an exception, in the case of an unsupported type.
+
+    .. versionadded:: 4.0.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+        name of column containing a struct.
+    options: dict, optional
+        options to control converting. accepts the same options as the XML 
datasource.
+        See `Data Source Option 
<https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option>`_
+        for the version you use.
+
+        .. # noqa
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        a XML string converted from given :class:`StructType`.
+
+    Examples
+    --------
+    >>> from pyspark.sql import Row
+    >>> data = [(1, Row(age=2, name='Alice'))]
+    >>> df = spark.createDataFrame(data, ("key", "value"))
+    >>> df.select(to_xml(df.value, {'rowTag':'person'}).alias("xml")).collect()
+    [Row(xml='<person>\\n    <age>2</age>\\n    
<name>Alice</name>\\n</person>')]
+    """
+
+    return _invoke_function("to_xml", _to_java_column(col), 
_options_to_str(options))
+
+
 @_try_remote_functions
 def schema_of_csv(csv: "ColumnOrName", options: Optional[Dict[str, str]] = 
None) -> Column:
     """
diff --git a/python/pyspark/sql/tests/connect/test_connect_function.py 
b/python/pyspark/sql/tests/connect/test_connect_function.py
index bc0cf162648..9adae0f6f75 100644
--- a/python/pyspark/sql/tests/connect/test_connect_function.py
+++ b/python/pyspark/sql/tests/connect/test_connect_function.py
@@ -1879,6 +1879,10 @@ class SparkConnectFunctionTests(ReusedConnectTestCase, 
PandasOnSparkTestUtils, S
                 cdf.select(CF.from_xml("b", schema, {"mode": "FAILFAST"})),
                 sdf.select(SF.from_xml("b", schema, {"mode": "FAILFAST"})),
             )
+            self.compare_by_show(
+                sdf.select(SF.to_xml(SF.struct(SF.from_xml("b", schema)), 
{"rowTag": "person"})),
+                sdf.select(SF.to_xml(SF.struct(SF.from_xml("b", schema)), 
{"rowTag": "person"})),
+            )
 
         c_schema = CF.schema_of_xml(CF.lit("""<p><a>1</a></p>"""))
         s_schema = SF.schema_of_xml(SF.lit("""<p><a>1</a></p>"""))
@@ -1923,6 +1927,16 @@ class SparkConnectFunctionTests(ReusedConnectTestCase, 
PandasOnSparkTestUtils, S
             ).toPandas(),
         )
 
+        # test to_xml
+        self.compare_by_show(
+            cdf.select(CF.to_xml(CF.struct(CF.lit("a"), CF.lit("b")))),
+            sdf.select(SF.to_xml(SF.struct(SF.lit("a"), SF.lit("b")))),
+        )
+        self.compare_by_show(
+            cdf.select(CF.to_xml(CF.struct(CF.lit("a"), CF.lit("b")), {"mode": 
"FAILFAST"})),
+            sdf.select(SF.to_xml(SF.struct(SF.lit("a"), SF.lit("b")), {"mode": 
"FAILFAST"})),
+        )
+
     def test_string_functions_one_arg(self):
         query = """
             SELECT * FROM VALUES
diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml
index 8f2b9ccffeb..e7f8cbe0fe6 100644
--- a/sql/catalyst/pom.xml
+++ b/sql/catalyst/pom.xml
@@ -113,6 +113,10 @@
       <groupId>org.apache.ws.xmlschema</groupId>
       <artifactId>xmlschema-core</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.glassfish.jaxb</groupId>
+      <artifactId>txw2</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.apache.datasketches</groupId>
       <artifactId>datasketches-java</artifactId>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 8be3199ef9b..1449764cdd5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -834,7 +834,8 @@ object FunctionRegistry {
 
     // Xml
     expression[XmlToStructs]("from_xml"),
-    expression[SchemaOfXml]("schema_of_xml")
+    expression[SchemaOfXml]("schema_of_xml"),
+    expression[StructsToXml]("to_xml")
   )
 
   val builtin: SimpleFunctionRegistry = {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala
index df63429ae33..047b669fc89 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala
@@ -16,13 +16,15 @@
  */
 package org.apache.spark.sql.catalyst.expressions
 
+import java.io.CharArrayWriter
+
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
 import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, 
Expression, ExpressionDescription, ExprUtils, NullIntolerant, 
TimeZoneAwareExpression, UnaryExpression}
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
 import org.apache.spark.sql.catalyst.util.{ArrayData, FailFastMode, 
FailureSafeParser, GenericArrayData, PermissiveMode}
-import org.apache.spark.sql.catalyst.xml.{StaxXmlParser, ValidatorUtil, 
XmlInferSchema, XmlOptions}
+import org.apache.spark.sql.catalyst.xml.{StaxXmlGenerator, StaxXmlParser, 
ValidatorUtil, XmlInferSchema, XmlOptions}
 import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
@@ -227,3 +229,89 @@ case class SchemaOfXml(
   override protected def withNewChildInternal(newChild: Expression): 
SchemaOfXml =
     copy(child = newChild)
 }
+
+/**
+ * Converts a [[StructType]] to a XML output string.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(expr[, options]) - Returns a XML string with a given struct 
value",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(named_struct('a', 1, 'b', 2));
+       <ROW>
+           <a>1</a>
+           <b>2</b>
+       </ROW>
+      > SELECT _FUNC_(named_struct('time', to_timestamp('2015-08-26', 
'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy'));
+       <ROW>
+           <time>26/08/2015</time>
+       </ROW>
+  """,
+  since = "4.0.0",
+  group = "xml_funcs")
+// scalastyle:on line.size.limit
+case class StructsToXml(
+    options: Map[String, String],
+    child: Expression,
+    timeZoneId: Option[String] = None)
+  extends UnaryExpression
+  with TimeZoneAwareExpression
+  with CodegenFallback
+  with ExpectsInputTypes
+  with NullIntolerant {
+  override def nullable: Boolean = true
+
+  def this(options: Map[String, String], child: Expression) = this(options, 
child, None)
+
+  // Used in `FunctionRegistry`
+  def this(child: Expression) = this(Map.empty, child, None)
+
+  def this(child: Expression, options: Expression) =
+    this(
+      options = ExprUtils.convertToMapData(options),
+      child = child,
+      timeZoneId = None)
+
+  @transient
+  lazy val writer = new CharArrayWriter()
+
+  @transient
+  lazy val inputSchema: StructType = child.dataType match {
+    case st: StructType => st
+    case other =>
+      throw new IllegalArgumentException(s"Unsupported input type 
${other.catalogString}")
+  }
+
+  @transient
+  lazy val gen = new StaxXmlGenerator(
+    inputSchema, writer, new XmlOptions(options, timeZoneId.get), false)
+
+  // This converts rows to the XML output according to the given schema.
+  @transient
+  lazy val converter: Any => UTF8String = {
+    def getAndReset(): UTF8String = {
+      gen.flush()
+      val xmlString = writer.toString
+      writer.reset()
+      UTF8String.fromString(xmlString)
+    }
+    (row: Any) =>
+      gen.write(row.asInstanceOf[InternalRow])
+      getAndReset()
+  }
+
+  override def dataType: DataType = StringType
+
+  override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
+    copy(timeZoneId = Option(timeZoneId))
+
+  override def nullSafeEval(value: Any): Any = converter(value)
+
+  override def inputTypes: Seq[AbstractDataType] = StructType :: Nil
+
+  override def prettyName: String = "to_xml"
+
+  override protected def withNewChildInternal(newChild: Expression): 
StructsToXml =
+    copy(child = newChild)
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlGenerator.scala
index f1cbc8996b0..4477cf50823 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlGenerator.scala
@@ -16,164 +16,201 @@
  */
 package org.apache.spark.sql.catalyst.xml
 
+import java.io.Writer
 import java.sql.Timestamp
-import javax.xml.stream.XMLStreamWriter
+import javax.xml.stream.XMLOutputFactory
 
 import scala.collection.Map
 
+import com.sun.xml.txw2.output.IndentingXMLStreamWriter
+import org.apache.hadoop.shaded.com.ctc.wstx.api.WstxOutputProperties
+
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
-// This class is borrowed from Spark json datasource.
-private[sql] object StaxXmlGenerator {
+class StaxXmlGenerator(
+    schema: StructType,
+    writer: Writer,
+    options: XmlOptions,
+    validateStructure: Boolean = true) {
+
+  require(options.attributePrefix.nonEmpty,
+    "'attributePrefix' option should not be empty string.")
+  private val indentDisabled = options.indent == ""
+
+  private val gen = {
+    val factory = XMLOutputFactory.newInstance()
+    // to_xml disables structure validation to allow multiple root tags
+    factory.setProperty(WstxOutputProperties.P_OUTPUT_VALIDATE_STRUCTURE, 
validateStructure)
+    val xmlWriter = factory.createXMLStreamWriter(writer)
+    if (!indentDisabled) {
+      val indentingXmlWriter = new IndentingXMLStreamWriter(xmlWriter)
+      indentingXmlWriter.setIndentStep(options.indent)
+      indentingXmlWriter
+    } else {
+      xmlWriter
+    }
+  }
+
+  private var rootElementWritten: Boolean = false
+  def writeDeclaration(): Unit = {
+    // Allow a root tag to be like "rootTag foo='bar'"
+    // This is hacky; won't deal correctly with spaces in attributes, but want
+    // to make this at least work for simple cases without much complication
+    val rootTagTokens = options.rootTag.split(" ")
+    val rootElementName = rootTagTokens.head
+    val rootAttributes: Map[String, String] =
+      if (rootTagTokens.length > 1) {
+        rootTagTokens.tail.map { kv =>
+          val Array(k, v) = kv.split("=")
+          k -> v.replaceAll("['\"]", "")
+        }.toMap
+      } else {
+        Map.empty
+      }
+    val declaration = options.declaration
+    if (declaration != null && declaration.nonEmpty) {
+      gen.writeProcessingInstruction("xml", declaration)
+      gen.writeCharacters("\n")
+    }
+    gen.writeStartElement(rootElementName)
+    rootAttributes.foreach { case (k, v) =>
+      gen.writeAttribute(k, v)
+    }
+    if (indentDisabled) {
+      gen.writeCharacters("\n")
+    }
+    rootElementWritten = true
+  }
+
+  def flush(): Unit = gen.flush()
+
+  def close(): Unit = {
+    if (rootElementWritten) {
+      gen.writeEndElement()
+      gen.close()
+    }
+    writer.close()
+  }
 
   /**
    * Transforms a single Row to XML
    *
-   * @param schema
-   *   the schema object used for conversion
-   * @param writer
-   *   a XML writer object
-   * @param options
-   *   options for XML datasource.
    * @param row
-   *   The row to convert
+   * The row to convert
    */
-  def apply(schema: StructType, writer: XMLStreamWriter, options: XmlOptions)(
-      row: InternalRow): Unit = {
-
-    require(
-      options.attributePrefix.nonEmpty,
-      "'attributePrefix' option should not be empty string.")
+  def write(row: InternalRow): Unit = {
+    writeChildElement(options.rowTag, schema, row)
+    if (indentDisabled) {
+      gen.writeCharacters("\n")
+    }
+  }
 
-    def writeChildElement(name: String, dt: DataType, v: Any): Unit = (name, 
dt, v) match {
+  def writeChildElement(name: String, dt: DataType, v: Any): Unit = (name, dt, 
v) match {
+    // If this is meant to be value but in no child, write only a value
+    case (_, _, null) | (_, NullType, _) if options.nullValue == null =>
+    // Because usually elements having `null` do not exist, just do not write
+    // elements when given values are `null`.
+    case (_, _, _) if name == options.valueTag =>
       // If this is meant to be value but in no child, write only a value
-      case (_, _, null) | (_, NullType, _) if options.nullValue == null =>
-      // Because usually elements having `null` do not exist, just do not write
-      // elements when given values are `null`.
-      case (_, _, _) if name == options.valueTag =>
-        // If this is meant to be value but in no child, write only a value
-        writeElement(dt, v, options)
-      case (_, _, _) =>
-        writer.writeStartElement(name)
-        writeElement(dt, v, options)
-        writer.writeEndElement()
-    }
+      writeElement(dt, v, options)
+    case (_, _, _) =>
+      gen.writeStartElement(name)
+      writeElement(dt, v, options)
+      gen.writeEndElement()
+  }
 
-    def writeChild(name: String, dt: DataType, v: Any): Unit = {
-      (dt, v) match {
-        // If this is meant to be attribute, write an attribute
-        case (_, null) | (NullType, _)
-            if name.startsWith(options.attributePrefix) && name != 
options.valueTag =>
-          Option(options.nullValue).foreach {
-            
writer.writeAttribute(name.substring(options.attributePrefix.length), _)
-          }
-        case _ if name.startsWith(options.attributePrefix) && name != 
options.valueTag =>
-          
writer.writeAttribute(name.substring(options.attributePrefix.length), 
v.toString)
-
-        // For ArrayType, we just need to write each as XML element.
-        case (ArrayType(ty, _), v: ArrayData) =>
-          (0 until v.numElements()).foreach { i =>
-            writeChildElement(name, ty, v.get(i, ty))
-          }
-        // For other datatypes, we just write normal elements.
-        case _ =>
-          writeChildElement(name, dt, v)
-      }
-    }
+  def writeChild(name: String, dt: DataType, v: Any): Unit = {
+    (dt, v) match {
+      // If this is meant to be attribute, write an attribute
+      case (_, null) | (NullType, _)
+        if name.startsWith(options.attributePrefix) && name != 
options.valueTag =>
+        Option(options.nullValue).foreach {
+          gen.writeAttribute(name.substring(options.attributePrefix.length), _)
+        }
+      case _ if name.startsWith(options.attributePrefix) && name != 
options.valueTag =>
+        gen.writeAttribute(name.substring(options.attributePrefix.length), 
v.toString)
 
-    def writeElement(dt: DataType, v: Any, options: XmlOptions): Unit = (dt, 
v) match {
-      case (_, null) | (NullType, _) => 
writer.writeCharacters(options.nullValue)
-      case (StringType, v: UTF8String) => writer.writeCharacters(v.toString)
-      case (StringType, v: String) => writer.writeCharacters(v)
-      case (TimestampType, v: Timestamp) =>
-        
writer.writeCharacters(options.timestampFormatterInWrite.format(v.toInstant()))
-      case (TimestampType, v: Long) =>
-        writer.writeCharacters(options.timestampFormatterInWrite.format(v))
-      case (DateType, v: Int) =>
-        writer.writeCharacters(options.dateFormatterInWrite.format(v))
-      case (IntegerType, v: Int) => writer.writeCharacters(v.toString)
-      case (ShortType, v: Short) => writer.writeCharacters(v.toString)
-      case (FloatType, v: Float) => writer.writeCharacters(v.toString)
-      case (DoubleType, v: Double) => writer.writeCharacters(v.toString)
-      case (LongType, v: Long) => writer.writeCharacters(v.toString)
-      case (DecimalType(), v: java.math.BigDecimal) => 
writer.writeCharacters(v.toString)
-      case (DecimalType(), v: Decimal) => writer.writeCharacters(v.toString)
-      case (ByteType, v: Byte) => writer.writeCharacters(v.toString)
-      case (BooleanType, v: Boolean) => writer.writeCharacters(v.toString)
-
-      // For the case roundtrip in reading and writing XML files, 
[[ArrayType]] cannot have
-      // [[ArrayType]] as element type. It always wraps the element with 
[[StructType]]. So,
-      // this case only can happen when we convert a normal [[DataFrame]] to 
XML file.
-      // When [[ArrayType]] has [[ArrayType]] as elements, it is confusing 
what is element name
-      // for XML file.
+      // For ArrayType, we just need to write each as XML element.
       case (ArrayType(ty, _), v: ArrayData) =>
         (0 until v.numElements()).foreach { i =>
-          writeChild(options.arrayElementName, ty, v.get(i, ty))
+          writeChildElement(name, ty, v.get(i, ty))
         }
+      // For other datatypes, we just write normal elements.
+      case _ =>
+        writeChildElement(name, dt, v)
+    }
+  }
 
-      case (MapType(_, vt, _), mv: Map[_, _]) =>
-        val (attributes, elements) = mv.toSeq.partition { case (f, _) =>
-          f.toString.startsWith(options.attributePrefix) && f.toString != 
options.valueTag
-        }
-        // We need to write attributes first before the value.
-        (attributes ++ elements).foreach { case (k, v) =>
-          writeChild(k.toString, vt, v)
-        }
+  def writeElement(dt: DataType, v: Any, options: XmlOptions): Unit = (dt, v) 
match {
+    case (_, null) | (NullType, _) => gen.writeCharacters(options.nullValue)
+    case (StringType, v: UTF8String) => gen.writeCharacters(v.toString)
+    case (StringType, v: String) => gen.writeCharacters(v)
+    case (TimestampType, v: Timestamp) =>
+      
gen.writeCharacters(options.timestampFormatterInWrite.format(v.toInstant()))
+    case (TimestampType, v: Long) =>
+      gen.writeCharacters(options.timestampFormatterInWrite.format(v))
+    case (DateType, v: Int) =>
+      gen.writeCharacters(options.dateFormatterInWrite.format(v))
+    case (IntegerType, v: Int) => gen.writeCharacters(v.toString)
+    case (ShortType, v: Short) => gen.writeCharacters(v.toString)
+    case (FloatType, v: Float) => gen.writeCharacters(v.toString)
+    case (DoubleType, v: Double) => gen.writeCharacters(v.toString)
+    case (LongType, v: Long) => gen.writeCharacters(v.toString)
+    case (DecimalType(), v: java.math.BigDecimal) => 
gen.writeCharacters(v.toString)
+    case (DecimalType(), v: Decimal) => gen.writeCharacters(v.toString)
+    case (ByteType, v: Byte) => gen.writeCharacters(v.toString)
+    case (BooleanType, v: Boolean) => gen.writeCharacters(v.toString)
+
+    // For the case roundtrip in reading and writing XML files, [[ArrayType]] 
cannot have
+    // [[ArrayType]] as element type. It always wraps the element with 
[[StructType]]. So,
+    // this case only can happen when we convert a normal [[DataFrame]] to XML 
file.
+    // When [[ArrayType]] has [[ArrayType]] as elements, it is confusing what 
is element name
+    // for XML file.
+    case (ArrayType(ty, _), v: ArrayData) =>
+      (0 until v.numElements()).foreach { i =>
+        writeChild(options.arrayElementName, ty, v.get(i, ty))
+      }
 
-      case (mt: MapType, mv: MapData) => writeMapData(mt, mv)
+    case (MapType(_, vt, _), mv: Map[_, _]) =>
+      val (attributes, elements) = mv.toSeq.partition { case (f, _) =>
+        f.toString.startsWith(options.attributePrefix) && f.toString != 
options.valueTag
+      }
+      // We need to write attributes first before the value.
+      (attributes ++ elements).foreach { case (k, v) =>
+        writeChild(k.toString, vt, v)
+      }
 
-      case (st: StructType, r: InternalRow) =>
-        val (attributes, elements) = st.zip(r.toSeq(st)).partition { case (f, 
_) =>
-          f.name.startsWith(options.attributePrefix) && f.name != 
options.valueTag
-        }
-        // We need to write attributes first before the value.
-        (attributes ++ elements).foreach { case (field, value) =>
-          writeChild(field.name, field.dataType, value)
-        }
+    case (mt: MapType, mv: MapData) => writeMapData(mt, mv)
 
-      case (_, _) =>
-        throw new IllegalArgumentException(
-          s"Failed to convert value $v (class of ${v.getClass}) in type $dt to 
XML.")
-    }
-
-    def writeMapData(mapType: MapType, map: MapData): Unit = {
-      val keyArray = map.keyArray()
-      val valueArray = map.valueArray()
-      // write attributes first
-      Seq(true, false).foreach { writeAttribute =>
-        (0 until map.numElements()).foreach { i =>
-          val key = keyArray.get(i, mapType.keyType).toString
-          val isAttribute = key.startsWith(options.attributePrefix) && key != 
options.valueTag
-          if (writeAttribute == isAttribute) {
-            writeChild(key, mapType.valueType, valueArray.get(i, 
mapType.valueType))
-          }
-        }
+    case (st: StructType, r: InternalRow) =>
+      val (attributes, elements) = st.zip(r.toSeq(st)).partition { case (f, _) 
=>
+        f.name.startsWith(options.attributePrefix) && f.name != 
options.valueTag
+      }
+      // We need to write attributes first before the value.
+      (attributes ++ elements).foreach { case (field, value) =>
+        writeChild(field.name, field.dataType, value)
       }
-    }
 
-    val rowSeq = row.toSeq(schema)
-    val (attributes, elements) = schema.zip(rowSeq).partition { case (f, _) =>
-      f.name.startsWith(options.attributePrefix) && f.name != options.valueTag
-    }
-    // Writing attributes
-    writer.writeStartElement(options.rowTag)
-    attributes.foreach {
-      case (f, v) if v == null || f.dataType == NullType =>
-        Option(options.nullValue).foreach {
-          
writer.writeAttribute(f.name.substring(options.attributePrefix.length), _)
+    case (_, _) =>
+      throw new IllegalArgumentException(
+        s"Failed to convert value $v (class of ${v.getClass}) in type $dt to 
XML.")
+  }
+
+  def writeMapData(mapType: MapType, map: MapData): Unit = {
+    val keyArray = map.keyArray()
+    val valueArray = map.valueArray()
+    // write attributes first
+    Seq(true, false).foreach { writeAttribute =>
+      (0 until map.numElements()).foreach { i =>
+        val key = keyArray.get(i, mapType.keyType).toString
+        val isAttribute = key.startsWith(options.attributePrefix) && key != 
options.valueTag
+        if (writeAttribute == isAttribute) {
+          writeChild(key, mapType.valueType, valueArray.get(i, 
mapType.valueType))
         }
-      case (f, v) =>
-        
writer.writeAttribute(f.name.substring(options.attributePrefix.length), 
v.toString)
+      }
     }
-    // Writing elements
-    val (names, values) = elements.unzip
-    val elementSchema = StructType(schema.filter(names.contains))
-
-    val elementRow = InternalRow.fromSeq(rowSeq.filter(values.contains))
-    writeElement(elementSchema, elementRow, options)
-    writer.writeEndElement()
   }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlOptions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlOptions.scala
index 763aa877ca0..7d049fdd82b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlOptions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlOptions.scala
@@ -100,6 +100,9 @@ private[sql] class XmlOptions(
   val wildcardColName =
     parameters.getOrElse(WILDCARD_COL_NAME, 
XmlOptions.DEFAULT_WILDCARD_COL_NAME)
   val ignoreNamespace = getBool(IGNORE_NAMESPACE, false)
+  // setting indent to "" disables indentation in the generated XML.
+  // Each row will be written in a new line.
+  val indent = parameters.getOrElse(INDENT, DEFAULT_INDENT)
 
   /**
    * Infer columns with all valid date entries as date type (otherwise 
inferred as string or
@@ -198,6 +201,7 @@ private[sql] object XmlOptions extends DataSourceOptions {
   val DEFAULT_CHARSET: String = StandardCharsets.UTF_8.name
   val DEFAULT_NULL_VALUE: String = null
   val DEFAULT_WILDCARD_COL_NAME = "xs_any"
+  val DEFAULT_INDENT = "    "
   val ROW_TAG = newOption("rowTag")
   val ROOT_TAG = newOption("rootTag")
   val DECLARATION = newOption("declaration")
@@ -222,6 +226,7 @@ private[sql] object XmlOptions extends DataSourceOptions {
   val DATE_FORMAT = newOption("dateFormat")
   val TIMESTAMP_FORMAT = newOption("timestampFormat")
   val TIME_ZONE = newOption("timeZone")
+  val INDENT = newOption("indent")
   // Options with alternative
   val ENCODING = "encoding"
   val CHARSET = "charset"
diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index b2b8398c9d5..8fabfd4699d 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -145,10 +145,6 @@
       <groupId>org.apache.ws.xmlschema</groupId>
       <artifactId>xmlschema-core</artifactId>
     </dependency>
-    <dependency>
-      <groupId>org.glassfish.jaxb</groupId>
-      <artifactId>txw2</artifactId>
-    </dependency>
     <dependency>
       <groupId>org.apache.xbean</groupId>
       <artifactId>xbean-asm9-shaded</artifactId>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlOutputWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlOutputWriter.scala
index cde866dcedf..ac3dfb287ad 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlOutputWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlOutputWriter.scala
@@ -17,15 +17,13 @@
 package org.apache.spark.sql.execution.datasources.xml
 
 import java.nio.charset.Charset
-import javax.xml.stream.XMLOutputFactory
 
-import com.sun.xml.txw2.output.IndentingXMLStreamWriter
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.TaskAttemptContext
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.{xml, InternalRow}
-import org.apache.spark.sql.catalyst.xml.XmlOptions
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.xml.{StaxXmlGenerator, XmlOptions}
 import org.apache.spark.sql.execution.datasources.{CodecStreams, OutputWriter}
 import org.apache.spark.sql.types.StructType
 
@@ -35,59 +33,20 @@ class XmlOutputWriter(
     context: TaskAttemptContext,
     options: XmlOptions) extends OutputWriter with Logging {
 
-  private val DEFAULT_INDENT = "    "
   private val charset = Charset.forName(options.charset)
 
   private val writer = CodecStreams.createOutputStreamWriter(context, new 
Path(path), charset)
-  private val factory = XMLOutputFactory.newInstance()
-  private val xmlWriter = factory.createXMLStreamWriter(writer)
-  private val indentingXmlWriter = new IndentingXMLStreamWriter(xmlWriter)
-  indentingXmlWriter.setIndentStep(DEFAULT_INDENT)
 
-  // Allow a root tag to be like "rootTag foo='bar'"
-  // This is hacky; won't deal correctly with spaces in attributes, but want
-  // to make this at least work for simple cases without much complication
-  private val rootTagTokens = options.rootTag.split(" ")
-  private val rootElementName = rootTagTokens.head
-  private val rootAttributes: Map[String, String] =
-    if (rootTagTokens.length > 1) {
-      rootTagTokens.tail.map { kv =>
-        val Array(k, v) = kv.split("=")
-        k -> v.replaceAll("['\"]", "")
-      }.toMap
-    } else {
-      Map.empty
-    }
-  private val declaration = options.declaration
-
-
-  // private val gen = new UnivocityGenerator(dataSchema, writer, params)
+  private val gen = new StaxXmlGenerator(dataSchema, writer, options)
 
   private var firstRow: Boolean = true
-
   override def write(row: InternalRow): Unit = {
     if (firstRow) {
-      if (declaration != null && declaration.nonEmpty) {
-        indentingXmlWriter.writeProcessingInstruction("xml", declaration)
-        indentingXmlWriter.writeCharacters("\n")
-      }
-      indentingXmlWriter.writeStartElement(rootElementName)
-      rootAttributes.foreach { case (k, v) =>
-        indentingXmlWriter.writeAttribute(k, v)
-      }
+      gen.writeDeclaration()
       firstRow = false
     }
-    xml.StaxXmlGenerator(
-      dataSchema,
-      indentingXmlWriter,
-      options)(row)
+    gen.write(row)
   }
 
-  override def close(): Unit = {
-    if (!firstRow) {
-      indentingXmlWriter.writeEndElement()
-      indentingXmlWriter.close()
-    }
-    writer.close()
-  }
+  override def close(): Unit = gen.close()
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 34e18cdf27a..b5e40fe35cf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -7218,6 +7218,36 @@ object functions {
     withExpr(SchemaOfXml(xml.expr, options.asScala.toMap))
   }
 
+  // scalastyle:off line.size.limit
+
+  /**
+   * (Java-specific) Converts a column containing a `StructType` into a XML 
string with
+   * the specified schema. Throws an exception, in the case of an unsupported 
type.
+   *
+   * @param e       a column containing a struct.
+   * @param options options to control how the struct column is converted into 
a XML string.
+   *                It accepts the same options as the XML data source.
+   *                See
+   *                <a href=
+   *                
"https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option";>
+   *                Data Source Option</a> in the version you use.
+   * @group xml_funcs
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def to_xml(e: Column, options: java.util.Map[String, String]): Column =
+    fnWithOptions("to_xml", options.asScala.iterator, e)
+
+  /**
+   * Converts a column containing a `StructType` into a XML string with the 
specified schema.
+   * Throws an exception, in the case of an unsupported type.
+   *
+   * @param e a column containing a struct.
+   * @group xml_funcs
+   * @since 4.0.0
+   */
+  def to_xml(e: Column): Column = to_xml(e, Map.empty[String, String].asJava)
+
   /**
    * A transform for timestamps and dates to partition data into years.
    *
diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md 
b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
index 42907b52cda..017cc474ea0 100644
--- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
+++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md
@@ -318,6 +318,7 @@
 | org.apache.spark.sql.catalyst.expressions.StringTrimRight | rtrim | SELECT 
rtrim('    SparkSQL   ') | struct<rtrim(    SparkSQL   ):string> |
 | org.apache.spark.sql.catalyst.expressions.StructsToCsv | to_csv | SELECT 
to_csv(named_struct('a', 1, 'b', 2)) | struct<to_csv(named_struct(a, 1, b, 
2)):string> |
 | org.apache.spark.sql.catalyst.expressions.StructsToJson | to_json | SELECT 
to_json(named_struct('a', 1, 'b', 2)) | struct<to_json(named_struct(a, 1, b, 
2)):string> |
+| org.apache.spark.sql.catalyst.expressions.StructsToXml | to_xml | SELECT 
to_xml(named_struct('a', 1, 'b', 2)) | struct<to_xml(named_struct(a, 1, b, 
2)):string> |
 | org.apache.spark.sql.catalyst.expressions.Substring | substr | SELECT 
substr('Spark SQL', 5) | struct<substr(Spark SQL, 5, 2147483647):string> |
 | org.apache.spark.sql.catalyst.expressions.Substring | substring | SELECT 
substring('Spark SQL', 5) | struct<substring(Spark SQL, 5, 2147483647):string> |
 | org.apache.spark.sql.catalyst.expressions.SubstringIndex | substring_index | 
SELECT substring_index('www.apache.org', '.', 2) | 
struct<substring_index(www.apache.org, ., 2):string> |
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/xml-functions.sql.out 
b/sql/core/src/test/resources/sql-tests/analyzer-results/xml-functions.sql.out
index e62f4aab344..51cf3d976f6 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/xml-functions.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/xml-functions.sql.out
@@ -1,4 +1,126 @@
 -- Automatically generated by SQLQueryTestSuite
+-- !query
+select to_xml(named_struct('a', 1, 'b', 2), map('indent', ''))
+-- !query analysis
+Project [to_xml((indent,), named_struct(a, 1, b, 2), 
Some(America/Los_Angeles)) AS to_xml(named_struct(a, 1, b, 2))#x]
++- OneRowRelation
+
+
+-- !query
+select to_xml(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), 
map('timestampFormat', 'dd/MM/yyyy', 'indent', ''))
+-- !query analysis
+Project [to_xml((timestampFormat,dd/MM/yyyy), (indent,), named_struct(time, 
to_timestamp(2015-08-26, Some(yyyy-MM-dd), TimestampType, 
Some(America/Los_Angeles), false)), Some(America/Los_Angeles)) AS 
to_xml(named_struct(time, to_timestamp(2015-08-26, yyyy-MM-dd)))#x]
++- OneRowRelation
+
+
+-- !query
+select to_xml(array(named_struct('a', 1, 'b', 2)))
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "inputSql" : "\"array(named_struct(a, 1, b, 2))\"",
+    "inputType" : "\"ARRAY<STRUCT<a: INT, b: INT>>\"",
+    "paramIndex" : "1",
+    "requiredType" : "\"STRUCT\"",
+    "sqlExpr" : "\"to_xml(array(named_struct(a, 1, b, 2)))\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 50,
+    "fragment" : "to_xml(array(named_struct('a', 1, 'b', 2)))"
+  } ]
+}
+
+
+-- !query
+select to_xml(map('a', 1))
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "inputSql" : "\"map(a, 1)\"",
+    "inputType" : "\"MAP<STRING, INT>\"",
+    "paramIndex" : "1",
+    "requiredType" : "\"STRUCT\"",
+    "sqlExpr" : "\"to_xml(map(a, 1))\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 26,
+    "fragment" : "to_xml(map('a', 1))"
+  } ]
+}
+
+
+-- !query
+select to_xml(named_struct('a', 1, 'b', 2), named_struct('mode', 'PERMISSIVE'))
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "INVALID_OPTIONS.NON_MAP_FUNCTION",
+  "sqlState" : "42K06",
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 79,
+    "fragment" : "to_xml(named_struct('a', 1, 'b', 2), named_struct('mode', 
'PERMISSIVE'))"
+  } ]
+}
+
+
+-- !query
+select to_xml(named_struct('a', 1, 'b', 2), map('mode', 1))
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "INVALID_OPTIONS.NON_STRING_TYPE",
+  "sqlState" : "42K06",
+  "messageParameters" : {
+    "mapType" : "\"MAP<STRING, INT>\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 59,
+    "fragment" : "to_xml(named_struct('a', 1, 'b', 2), map('mode', 1))"
+  } ]
+}
+
+
+-- !query
+select to_xml()
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "WRONG_NUM_ARGS.WITHOUT_SUGGESTION",
+  "sqlState" : "42605",
+  "messageParameters" : {
+    "actualNum" : "0",
+    "docroot" : "https://spark.apache.org/docs/latest";,
+    "expectedNum" : "[1, 2]",
+    "functionName" : "`to_xml`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 15,
+    "fragment" : "to_xml()"
+  } ]
+}
+
+
 -- !query
 select from_xml('<p><a>1</a></p>', 'a INT')
 -- !query analysis
diff --git a/sql/core/src/test/resources/sql-tests/inputs/xml-functions.sql 
b/sql/core/src/test/resources/sql-tests/inputs/xml-functions.sql
index cdf56712b11..7e3d21ef753 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/xml-functions.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/xml-functions.sql
@@ -1,4 +1,14 @@
--- from_json
+-- to_xml
+select to_xml(named_struct('a', 1, 'b', 2), map('indent', ''));
+select to_xml(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), 
map('timestampFormat', 'dd/MM/yyyy', 'indent', ''));
+-- Check if errors handled
+select to_xml(array(named_struct('a', 1, 'b', 2)));
+select to_xml(map('a', 1));
+select to_xml(named_struct('a', 1, 'b', 2), named_struct('mode', 
'PERMISSIVE'));
+select to_xml(named_struct('a', 1, 'b', 2), map('mode', 1));
+select to_xml();
+
+-- from_xml
 select from_xml('<p><a>1</a></p>', 'a INT');
 select from_xml('<p><time>26/08/2015</time></p>', 'time Timestamp', 
map('timestampFormat', 'dd/MM/yyyy'));
 -- Check if errors handled
@@ -11,15 +21,15 @@ select from_xml();
 -- Clean up
 DROP VIEW IF EXISTS xmlTable;
 
--- from_json - complex types
+-- from_xml - complex types
 select from_xml('<p><a>1</a></p>', 'struct<a:array<int>>');
 select from_xml('<p><a>1</a><b>"2"</b></p>', 'struct<a:int,b:string>');
 
--- infer schema of json literal
+-- infer schema of xml literal
 select schema_of_xml('<p><a>1</a><b>"2"</b></p>');
 select from_xml('<p><a>1</a><a>2</a><a>3</a></p>', 
schema_of_xml('<p><a>1</a><a>2</a></p>'));
 
--- from_json - array type
+-- from_xml - array type
 select from_xml('<p><a>1</a><a>2</a></p>', 'struct<a:array<int>>');
 select from_xml('<p><a>1</a><a>"2"</a></p>', 'struct<a:array<int>>');
 select from_xml('<p><a>1</a><a></a></p>', 'struct<a:array<int>>');
diff --git 
a/sql/core/src/test/resources/sql-tests/results/xml-functions.sql.out 
b/sql/core/src/test/resources/sql-tests/results/xml-functions.sql.out
index 61e8e9c8662..704addb7a93 100644
--- a/sql/core/src/test/resources/sql-tests/results/xml-functions.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/xml-functions.sql.out
@@ -1,4 +1,138 @@
 -- Automatically generated by SQLQueryTestSuite
+-- !query
+select to_xml(named_struct('a', 1, 'b', 2), map('indent', ''))
+-- !query schema
+struct<to_xml(named_struct(a, 1, b, 2)):string>
+-- !query output
+<ROW><a>1</a><b>2</b></ROW>
+
+
+-- !query
+select to_xml(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), 
map('timestampFormat', 'dd/MM/yyyy', 'indent', ''))
+-- !query schema
+struct<to_xml(named_struct(time, to_timestamp(2015-08-26, yyyy-MM-dd))):string>
+-- !query output
+<ROW><time>26/08/2015</time></ROW>
+
+
+-- !query
+select to_xml(array(named_struct('a', 1, 'b', 2)))
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "inputSql" : "\"array(named_struct(a, 1, b, 2))\"",
+    "inputType" : "\"ARRAY<STRUCT<a: INT, b: INT>>\"",
+    "paramIndex" : "1",
+    "requiredType" : "\"STRUCT\"",
+    "sqlExpr" : "\"to_xml(array(named_struct(a, 1, b, 2)))\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 50,
+    "fragment" : "to_xml(array(named_struct('a', 1, 'b', 2)))"
+  } ]
+}
+
+
+-- !query
+select to_xml(map('a', 1))
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "inputSql" : "\"map(a, 1)\"",
+    "inputType" : "\"MAP<STRING, INT>\"",
+    "paramIndex" : "1",
+    "requiredType" : "\"STRUCT\"",
+    "sqlExpr" : "\"to_xml(map(a, 1))\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 26,
+    "fragment" : "to_xml(map('a', 1))"
+  } ]
+}
+
+
+-- !query
+select to_xml(named_struct('a', 1, 'b', 2), named_struct('mode', 'PERMISSIVE'))
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "INVALID_OPTIONS.NON_MAP_FUNCTION",
+  "sqlState" : "42K06",
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 79,
+    "fragment" : "to_xml(named_struct('a', 1, 'b', 2), named_struct('mode', 
'PERMISSIVE'))"
+  } ]
+}
+
+
+-- !query
+select to_xml(named_struct('a', 1, 'b', 2), map('mode', 1))
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "INVALID_OPTIONS.NON_STRING_TYPE",
+  "sqlState" : "42K06",
+  "messageParameters" : {
+    "mapType" : "\"MAP<STRING, INT>\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 59,
+    "fragment" : "to_xml(named_struct('a', 1, 'b', 2), map('mode', 1))"
+  } ]
+}
+
+
+-- !query
+select to_xml()
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "WRONG_NUM_ARGS.WITHOUT_SUGGESTION",
+  "sqlState" : "42605",
+  "messageParameters" : {
+    "actualNum" : "0",
+    "docroot" : "https://spark.apache.org/docs/latest";,
+    "expectedNum" : "[1, 2]",
+    "functionName" : "`to_xml`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 15,
+    "fragment" : "to_xml()"
+  } ]
+}
+
+
 -- !query
 select from_xml('<p><a>1</a></p>', 'a INT')
 -- !query schema
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala
index 7e5817bc3a0..20600848019 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala
@@ -1287,6 +1287,31 @@ class XmlSuite extends QueryTest with SharedSparkSession 
{
     assert(result.select("decoded").head().get(0) === Row(null, null))
   }
 
+  test("from_xml to to_xml round trip") {
+    val xmlData = Seq(
+      "<person><age>100</age><name>Alice</name></person>",
+      "<person><age>100</age><name>Alice</name></person>",
+      "<person><age>100</age><name>Alice</name></person>")
+    val df = xmlData.toDF("xmlString")
+    val xmlSchema = schema_of_xml(xmlData.head)
+
+    val df2 = df.withColumn("parsed",
+      from_xml(df.col("xmlString"), xmlSchema))
+    val df3 = df2.select(to_xml($"parsed", Map("rowTag" -> "person").asJava))
+    val xmlResult = df3.collect().map(_.getString(0).replaceAll("\\s+", ""))
+    assert(xmlData.sortBy(_.toString) === xmlResult.sortBy(_.toString))
+  }
+
+  test("to_xml to from_xml round trip") {
+    val df = spark.read.option("rowTag", "ROW").xml(getTestResourcePath(resDir 
+ "cars.xml"))
+    val df1 = df.select(to_xml(struct("*")).as("xmlString"))
+    val schema = schema_of_xml(df1.select("xmlString").head().getString(0))
+    val df2 = df1.select(from_xml($"xmlString", schema).as("fromXML"))
+    val df3 = df2.select(col("fromXML.*"))
+    assert(df3.collect().length === 3)
+    checkAnswer(df3, df)
+  }
+
   test("decimals with scale greater than precision") {
     val spark = this.spark;
     import spark.implicits._


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to