This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 89041a4a8c7 [SPARK-44788][CONNECT][PYTHON][SQL] Add from_xml and schema_of_xml to pyspark, spark connect and sql function 89041a4a8c7 is described below commit 89041a4a8c7b7787fa10f090d4324f20447c4dd3 Author: Sandip Agarwala <131817656+sandip...@users.noreply.github.com> AuthorDate: Mon Sep 18 16:35:22 2023 +0900 [SPARK-44788][CONNECT][PYTHON][SQL] Add from_xml and schema_of_xml to pyspark, spark connect and sql function ### What changes were proposed in this pull request? Add from_xml and schema_of_xml to pyspark, spark connect and sql function ### Why are the changes needed? from_xml parses XML data nested in a `Column` into a struct. schema_of_xml infers schema from XML data in a `Column`. This PR adds these two functions to pyspark, spark connect and SQL function registry. It is one of the series of PR to add native support for [XML File Format](https://issues.apache.org/jira/browse/SPARK-44265) in spark. ### Does this PR introduce _any_ user-facing change? Yes, it adds from_xml and schema_of_xml to pyspark, spark connect and sql function ### How was this patch tested? - Added new unit tests - Github Action ### Was this patch authored or co-authored using generative AI tooling? No Closes #42938 from sandip-db/from_xml-master. Authored-by: Sandip Agarwala <131817656+sandip...@users.noreply.github.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../scala/org/apache/spark/sql/functions.scala | 51 +++- .../org/apache/spark/sql/FunctionTestSuite.scala | 10 + python/pyspark/errors/error_classes.py | 5 + python/pyspark/sql/connect/functions.py | 46 +++ python/pyspark/sql/functions.py | 129 +++++++++ .../sql/tests/connect/test_connect_function.py | 106 +++++++ python/pyspark/sql/tests/test_functions.py | 28 +- .../sql/catalyst/analysis/FunctionRegistry.scala | 6 +- .../sql/catalyst/expressions/xmlExpressions.scala | 26 +- .../scala/org/apache/spark/sql/functions.scala | 60 +++- .../sql-functions/sql-expression-schema.md | 4 +- .../analyzer-results/xml-functions.sql.out | 271 +++++++++++++++++ .../resources/sql-tests/inputs/xml-functions.sql | 50 ++++ .../sql-tests/results/xml-functions.sql.out | 319 +++++++++++++++++++++ .../apache/spark/sql/DataFrameFunctionsSuite.scala | 5 +- .../sql/execution/datasources/xml/XmlSuite.scala | 2 +- 16 files changed, 1093 insertions(+), 25 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala index 83f0ee64501..b94a33007b1 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.api.java._ import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.PrimitiveLongEncoder import org.apache.spark.sql.connect.common.LiteralValueProtoConverter._ import org.apache.spark.sql.connect.common.UdfUtils +import org.apache.spark.sql.errors.DataTypeErrors import org.apache.spark.sql.expressions.{ScalarUserDefinedFunction, UserDefinedFunction} import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.sql.types.DataType.parseTypeWithFallback @@ -7311,15 +7312,59 @@ object functions { * "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data * Source Option</a> in the version you use. * @group collection_funcs + * @since 4.0.0 + */ + // scalastyle:on line.size.limit + def from_xml(e: Column, schema: StructType, options: java.util.Map[String, String]): Column = + from_xml(e, lit(schema.json), options.asScala.toIterator) + + // scalastyle:off line.size.limit + + /** + * (Java-specific) Parses a column containing a XML string into a `StructType` with the + * specified schema. Returns `null`, in the case of an unparseable string. * + * @param e + * a string column containing XML data. + * @param schema + * the schema as a DDL-formatted string. + * @param options + * options to control how the XML is parsed. accepts the same options and the xml data source. + * See <a href= + * "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data + * Source Option</a> in the version you use. + * @group collection_funcs * @since 4.0.0 */ // scalastyle:on line.size.limit - def from_xml(e: Column, schema: StructType, options: Map[String, String]): Column = - from_xml(e, lit(schema.toDDL), options.iterator) + def from_xml(e: Column, schema: String, options: java.util.Map[String, String]): Column = { + val dataType = + parseTypeWithFallback(schema, DataType.fromJson, fallbackParser = DataType.fromDDL) + val structType = dataType match { + case t: StructType => t + case _ => throw DataTypeErrors.failedParsingStructTypeError(schema) + } + from_xml(e, structType, options) + } // scalastyle:off line.size.limit + /** + * (Java-specific) Parses a column containing a XML string into a `StructType` with the + * specified schema. Returns `null`, in the case of an unparseable string. + * + * @param e + * a string column containing XML data. + * @param schema + * the schema to use when parsing the XML string + * @group collection_funcs + * @since 4.0.0 + */ + // scalastyle:on line.size.limit + def from_xml(e: Column, schema: Column): Column = { + from_xml(e, schema, Iterator.empty) + } + // scalastyle:off line.size.limit /** * (Java-specific) Parses a column containing a XML string into the data type corresponding to * the specified schema. Returns `null`, in the case of an unparseable string. @@ -7354,7 +7399,7 @@ object functions { * @since 4.0.0 */ def from_xml(e: Column, schema: StructType): Column = - from_xml(e, schema, Map.empty[String, String]) + from_xml(e, schema, Map.empty[String, String].asJava) private def from_xml(e: Column, schema: Column, options: Iterator[(String, String)]): Column = { fnWithOptions("from_xml", options, e, schema) diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala index 78cc26d627c..1240fbc9ada 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala @@ -229,6 +229,16 @@ class FunctionTestSuite extends ConnectFunSuite { schema_of_csv("x,y"), schema_of_csv(lit("x,y"), Collections.emptyMap())) testEquals("to_csv", to_csv(a), to_csv(a, Collections.emptyMap[String, String])) + testEquals( + "from_xml", + from_xml(a, schema), + from_xml(a, lit(schema.json)), + from_xml(a, schema.json, Collections.emptyMap[String, String]), + from_xml(a, schema.json, Map.empty[String, String].asJava), + from_xml(a, schema, Map.empty[String, String].asJava), + from_xml(a, schema, Collections.emptyMap[String, String]), + from_xml(a, lit(schema.json), Collections.emptyMap[String, String])) + testEquals( "from_avro", avroFn.from_avro(a, """{"type": "int", "name": "id"}"""), diff --git a/python/pyspark/errors/error_classes.py b/python/pyspark/errors/error_classes.py index c98e9feb610..eaefbf4d576 100644 --- a/python/pyspark/errors/error_classes.py +++ b/python/pyspark/errors/error_classes.py @@ -477,6 +477,11 @@ ERROR_CLASSES_JSON = """ "Argument `<arg_name>` should be a Column or str, got <arg_type>." ] }, + "NOT_COLUMN_OR_STR_OR_STRUCT" : { + "message" : [ + "Argument `<arg_name>` should be a StructType, Column or str, got <arg_type>." + ] + }, "NOT_DATAFRAME" : { "message" : [ "Argument `<arg_name>` should be a DataFrame, got <arg_type>." diff --git a/python/pyspark/sql/connect/functions.py b/python/pyspark/sql/connect/functions.py index f89b1aae500..24b552a45e6 100644 --- a/python/pyspark/sql/connect/functions.py +++ b/python/pyspark/sql/connect/functions.py @@ -1855,6 +1855,32 @@ def from_json( from_json.__doc__ = pysparkfuncs.from_json.__doc__ +def from_xml( + col: "ColumnOrName", + schema: Union[StructType, Column, str], + options: Optional[Dict[str, str]] = None, +) -> Column: + if isinstance(schema, Column): + _schema = schema + elif isinstance(schema, StructType): + _schema = lit(schema.json()) + elif isinstance(schema, str): + _schema = lit(schema) + else: + raise PySparkTypeError( + error_class="NOT_COLUMN_OR_STR_OR_STRUCT", + message_parameters={"arg_name": "schema", "arg_type": type(schema).__name__}, + ) + + if options is None: + return _invoke_function("from_xml", _to_col(col), _schema) + else: + return _invoke_function("from_xml", _to_col(col), _schema, _options_to_col(options)) + + +from_xml.__doc__ = pysparkfuncs.from_xml.__doc__ + + def get(col: "ColumnOrName", index: Union["ColumnOrName", int]) -> Column: index = lit(index) if isinstance(index, int) else index @@ -2064,6 +2090,26 @@ def schema_of_json(json: "ColumnOrName", options: Optional[Dict[str, str]] = Non schema_of_json.__doc__ = pysparkfuncs.schema_of_json.__doc__ +def schema_of_xml(xml: "ColumnOrName", options: Optional[Dict[str, str]] = None) -> Column: + if isinstance(xml, Column): + _xml = xml + elif isinstance(xml, str): + _xml = lit(xml) + else: + raise PySparkTypeError( + error_class="NOT_COLUMN_OR_STR", + message_parameters={"arg_name": "xml", "arg_type": type(xml).__name__}, + ) + + if options is None: + return _invoke_function("schema_of_xml", _xml) + else: + return _invoke_function("schema_of_xml", _xml, _options_to_col(options)) + + +schema_of_xml.__doc__ = pysparkfuncs.schema_of_xml.__doc__ + + def shuffle(col: "ColumnOrName") -> Column: return _invoke_function_over_columns("shuffle", col) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 1e12b9bf469..3c65e8d9162 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -13058,6 +13058,135 @@ def json_object_keys(col: "ColumnOrName") -> Column: return _invoke_function_over_columns("json_object_keys", col) +@_try_remote_functions +def from_xml( + col: "ColumnOrName", + schema: Union[StructType, Column, str], + options: Optional[Dict[str, str]] = None, +) -> Column: + """ + Parses a column containing a XML string to a row with + the specified schema. Returns `null`, in the case of an unparseable string. + + .. versionadded:: 4.0.0 + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + a column or column name in XML format + schema : :class:`StructType`, :class:`~pyspark.sql.Column` or str + a StructType, Column or Python string literal with a DDL-formatted string + to use when parsing the Xml column + options : dict, optional + options to control parsing. accepts the same options as the Xml datasource. + See `Data Source Option <https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option>`_ + for the version you use. + + .. # noqa + + Returns + ------- + :class:`~pyspark.sql.Column` + a new column of complex type from given XML object. + + Examples + -------- + >>> from pyspark.sql.types import * + >>> from pyspark.sql.functions import from_xml, schema_of_xml, lit + + StructType input with simple IntegerType. + + >>> data = [(1, '''<p><a>1</a></p>''')] + >>> df = spark.createDataFrame(data, ("key", "value")) + + TODO: Fix StructType for spark connect + schema = StructType([StructField("a", IntegerType())]) + + >>> schema = "STRUCT<a: BIGINT>" + >>> df.select(from_xml(df.value, schema).alias("xml")).collect() + [Row(xml=Row(a=1))] + + String input. + + >>> df.select(from_xml(df.value, "a INT").alias("xml")).collect() + [Row(xml=Row(a=1))] + + >>> data = [(1, '<p><a>1</a><a>2</a></p>')] + >>> df = spark.createDataFrame(data, ("key", "value")) + + TODO: Fix StructType for spark connect + schema = StructType([StructField("a", ArrayType(IntegerType()))]) + + >>> schema = "STRUCT<a: ARRAY<BIGINT>>" + >>> df.select(from_xml(df.value, schema).alias("xml")).collect() + [Row(xml=Row(a=[1, 2]))] + + Column input generated by schema_of_xml. + + >>> schema = schema_of_xml(lit(data[0][1])) + >>> df.select(from_xml(df.value, schema).alias("xml")).collect() + [Row(xml=Row(a=[1, 2]))] + """ + + if isinstance(schema, StructType): + schema = schema.json() + elif isinstance(schema, Column): + schema = _to_java_column(schema) + elif not isinstance(schema, str): + raise PySparkTypeError( + error_class="NOT_COLUMN_OR_STR_OR_STRUCT", + message_parameters={"arg_name": "schema", "arg_type": type(schema).__name__}, + ) + return _invoke_function("from_xml", _to_java_column(col), schema, _options_to_str(options)) + + +@_try_remote_functions +def schema_of_xml(xml: "ColumnOrName", options: Optional[Dict[str, str]] = None) -> Column: + """ + Parses a XML string and infers its schema in DDL format. + + .. versionadded:: 4.0.0 + + Parameters + ---------- + xml : :class:`~pyspark.sql.Column` or str + a XML string or a foldable string column containing a XML string. + options : dict, optional + options to control parsing. accepts the same options as the XML datasource. + See `Data Source Option <https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option>`_ + for the version you use. + + .. # noqa + + Returns + ------- + :class:`~pyspark.sql.Column` + a string representation of a :class:`StructType` parsed from given XML. + + Examples + -------- + >>> df = spark.range(1) + >>> df.select(schema_of_xml(lit('<p><a>1</a></p>')).alias("xml")).collect() + [Row(xml='STRUCT<a: BIGINT>')] + >>> df.select(schema_of_xml(lit('<p><a>1</a><a>2</a></p>')).alias("xml")).collect() + [Row(xml='STRUCT<a: ARRAY<BIGINT>>')] + >>> schema = schema_of_xml('<p><a attr="2">1</a></p>', {'excludeAttribute':'true'}) + >>> df.select(schema.alias("xml")).collect() + [Row(xml='STRUCT<a: BIGINT>')] + """ + if isinstance(xml, str): + col = _create_column_from_literal(xml) + elif isinstance(xml, Column): + col = _to_java_column(xml) + else: + raise PySparkTypeError( + error_class="NOT_COLUMN_OR_STR", + message_parameters={"arg_name": "xml", "arg_type": type(xml).__name__}, + ) + + return _invoke_function("schema_of_xml", col, _options_to_str(options)) + + @_try_remote_functions def schema_of_csv(csv: "ColumnOrName", options: Optional[Dict[str, str]] = None) -> Column: """ diff --git a/python/pyspark/sql/tests/connect/test_connect_function.py b/python/pyspark/sql/tests/connect/test_connect_function.py index f5aaf535de7..f958b5fb574 100644 --- a/python/pyspark/sql/tests/connect/test_connect_function.py +++ b/python/pyspark/sql/tests/connect/test_connect_function.py @@ -1817,6 +1817,112 @@ class SparkConnectFunctionTests(ReusedConnectTestCase, PandasOnSparkTestUtils, S sdf.select(SF.to_json(SF.struct(SF.lit("a"), SF.lit("b")), {"mode": "FAILFAST"})), ) + def test_xml_functions(self): + query = """ + SELECT * FROM VALUES + ('<p><a>1</a></p>', '<p><a>1</a><a>2</a><a>3</a></p>', + '<p><a attr="s"><b>5.0</b></a></p>'), + ('<p><a>0</a></p>', '<p><a>4</a><a>5</a><a>6</a></p>', '<p><a attr="t"></a></p>') + AS tab(a, b, c) + """ + # +---------------+-------------------------------+---------------------------------+ + # | a| b| c| + # +---------------+-------------------------------+---------------------------------+ + # |<p><a>1</a></p>|<p><a>1</a><a>2</a><a>3</a></p>|<p><a attr="s"><b>5.0</b></a></p>| + # |<p><a>1</a></p>|<p><a>4</a><a>5</a><a>6</a></p>| <p><a attr="t"></a></p>| + # +---------------+-------------------------------+---------------------------------+ + + cdf = self.connect.sql(query) + sdf = self.spark.sql(query) + + # test from_xml + # TODO(SPARK-45190): Address StructType schema parse error + for schema in [ + "a INT", + # StructType([StructField("a", IntegerType())]), + # StructType([StructField("a", ArrayType(IntegerType()))]), + ]: + self.compare_by_show( + cdf.select(CF.from_xml(cdf.a, schema)), + sdf.select(SF.from_xml(sdf.a, schema)), + ) + self.compare_by_show( + cdf.select(CF.from_xml("a", schema)), + sdf.select(SF.from_xml("a", schema)), + ) + self.compare_by_show( + cdf.select(CF.from_xml(cdf.a, schema, {"mode": "FAILFAST"})), + sdf.select(SF.from_xml(sdf.a, schema, {"mode": "FAILFAST"})), + ) + self.compare_by_show( + cdf.select(CF.from_xml("a", schema, {"mode": "FAILFAST"})), + sdf.select(SF.from_xml("a", schema, {"mode": "FAILFAST"})), + ) + + for schema in [ + "STRUCT<a: ARRAY<INT>>", + # StructType([StructField("a", ArrayType(IntegerType()))]), + ]: + self.compare_by_show( + cdf.select(CF.from_xml(cdf.b, schema)), + sdf.select(SF.from_xml(sdf.b, schema)), + ) + self.compare_by_show( + cdf.select(CF.from_xml("b", schema)), + sdf.select(SF.from_xml("b", schema)), + ) + self.compare_by_show( + cdf.select(CF.from_xml(cdf.b, schema, {"mode": "FAILFAST"})), + sdf.select(SF.from_xml(sdf.b, schema, {"mode": "FAILFAST"})), + ) + self.compare_by_show( + cdf.select(CF.from_xml("b", schema, {"mode": "FAILFAST"})), + sdf.select(SF.from_xml("b", schema, {"mode": "FAILFAST"})), + ) + + c_schema = CF.schema_of_xml(CF.lit("""<p><a>1</a></p>""")) + s_schema = SF.schema_of_xml(SF.lit("""<p><a>1</a></p>""")) + + self.compare_by_show( + cdf.select(CF.from_xml(cdf.a, c_schema)), + sdf.select(SF.from_xml(sdf.a, s_schema)), + ) + self.compare_by_show( + cdf.select(CF.from_xml("a", c_schema)), + sdf.select(SF.from_xml("a", s_schema)), + ) + self.compare_by_show( + cdf.select(CF.from_xml(cdf.a, c_schema, {"mode": "FAILFAST"})), + sdf.select(SF.from_xml(sdf.a, s_schema, {"mode": "FAILFAST"})), + ) + self.compare_by_show( + cdf.select(CF.from_xml("a", c_schema, {"mode": "FAILFAST"})), + sdf.select(SF.from_xml("a", s_schema, {"mode": "FAILFAST"})), + ) + + with self.assertRaises(PySparkTypeError) as pe: + CF.from_xml("a", [c_schema]) + + self.check_error( + exception=pe.exception, + error_class="NOT_COLUMN_OR_STR_OR_STRUCT", + message_parameters={"arg_name": "schema", "arg_type": "list"}, + ) + + # test schema_of_xml + self.assert_eq( + cdf.select(CF.schema_of_xml(CF.lit("<p><a>1</a></p>"))).toPandas(), + sdf.select(SF.schema_of_xml(SF.lit("<p><a>1</a></p>"))).toPandas(), + ) + self.assert_eq( + cdf.select( + CF.schema_of_xml(CF.lit("<p><a>1</a></p>"), {"mode": "FAILFAST"}) + ).toPandas(), + sdf.select( + SF.schema_of_xml(SF.lit("<p><a>1</a></p>"), {"mode": "FAILFAST"}) + ).toPandas(), + ) + def test_string_functions_one_arg(self): query = """ SELECT * FROM VALUES diff --git a/python/pyspark/sql/tests/test_functions.py b/python/pyspark/sql/tests/test_functions.py index e643a6c07fa..b0ad311d733 100644 --- a/python/pyspark/sql/tests/test_functions.py +++ b/python/pyspark/sql/tests/test_functions.py @@ -82,12 +82,7 @@ class FunctionsTestsMixin: missing_in_py = jvm_fn_set.difference(py_fn_set) # Functions that we expect to be missing in python until they are added to pyspark - expected_missing_in_py = { - # TODO: XML functions will soon be added and removed from this list - # https://issues.apache.org/jira/browse/SPARK-44788 - "from_xml", - "schema_of_xml", - } + expected_missing_in_py = set() self.assertEqual( expected_missing_in_py, missing_in_py, "Missing functions in pyspark not as expected" @@ -1286,6 +1281,27 @@ class FunctionsTestsMixin: message_parameters={"arg_name": "schema", "arg_type": "int"}, ) + def test_schema_of_xml(self): + with self.assertRaises(PySparkTypeError) as pe: + F.schema_of_xml(1) + + self.check_error( + exception=pe.exception, + error_class="NOT_COLUMN_OR_STR", + message_parameters={"arg_name": "xml", "arg_type": "int"}, + ) + + def test_from_xml(self): + df = self.spark.range(10) + with self.assertRaises(PySparkTypeError) as pe: + F.from_xml(df.id, 1) + + self.check_error( + exception=pe.exception, + error_class="NOT_COLUMN_OR_STR_OR_STRUCT", + message_parameters={"arg_name": "schema", "arg_type": "int"}, + ) + def test_greatest(self): df = self.spark.range(10) with self.assertRaises(PySparkValueError) as pe: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index af9c095deb9..8be3199ef9b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -830,7 +830,11 @@ object FunctionRegistry { // csv expression[CsvToStructs]("from_csv"), expression[SchemaOfCsv]("schema_of_csv"), - expression[StructsToCsv]("to_csv") + expression[StructsToCsv]("to_csv"), + + // Xml + expression[XmlToStructs]("from_xml"), + expression[SchemaOfXml]("schema_of_xml") ) val builtin: SimpleFunctionRegistry = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala index ddc63b25903..c0fd725943d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala @@ -28,6 +28,26 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String +/** + * Converts an XML input string to a [[StructType]] with the specified schema. + * It is assumed that the XML input string constitutes a single record; so the + * [[rowTag]] option will be not applicable. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(xmlStr, schema[, options]) - Returns a struct value with the given `xmlStr` and `schema`.", + examples = """ + Examples: + > SELECT _FUNC_('<p><a>1</a><b>0.8</b></p>', 'a INT, b DOUBLE'); + {"a":1,"b":0.8} + > SELECT _FUNC_('<p><time>26/08/2015</time></p>', 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy')); + {"time":2015-08-26 00:00:00} + > SELECT _FUNC_('<p><teacher>Alice</teacher><student><name>Bob</name><rank>1</rank></student><student><name>Charlie</name><rank>2</rank></student></p>', 'STRUCT<teacher: STRING, student: ARRAY<STRUCT<name: STRING, rank: INT>>>'); + {"teacher":"Alice","student":[{"name":"Bob","rank":1},{"name":"Charlie","rank":2}]} + """, + group = "xml_funcs", + since = "4.0.0") +// scalastyle:on line.size.limit case class XmlToStructs( schema: DataType, options: Map[String, String], @@ -138,8 +158,10 @@ case class XmlToStructs( usage = "_FUNC_(xml[, options]) - Returns schema in the DDL format of XML string.", examples = """ Examples: - > SELECT _FUNC_('1,abc'); - STRUCT<_c0: INT, _c1: STRING> + > SELECT _FUNC_('<p><a>1</a></p>'); + STRUCT<a: BIGINT> + > SELECT _FUNC_('<p><a attr="2">1</a><a>3</a></p>', map('excludeAttribute', 'true')); + STRUCT<a: ARRAY<BIGINT>> """, since = "4.0.0", group = "xml_funcs") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index dcde01ec408..a4b8f1b1b68 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.xml._ import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, HintInfo, ResolvedHint} import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, TimestampFormatter} -import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.errors.{DataTypeErrors, QueryCompilationErrors} import org.apache.spark.sql.execution.SparkSqlParser import org.apache.spark.sql.expressions.{Aggregator, SparkUserDefinedFunction, UserDefinedAggregator, UserDefinedFunction} import org.apache.spark.sql.internal.SQLConf @@ -7381,15 +7381,61 @@ object functions { * "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> * Data Source Option</a> in the version you use. * @group collection_funcs - * @since + * @since 4.0.0 */ // scalastyle:on line.size.limit - def from_xml(e: Column, schema: StructType, options: Map[String, String]): Column = withExpr { - XmlToStructs(CharVarcharUtils.failIfHasCharVarchar(schema), options, e.expr) + def from_xml(e: Column, schema: StructType, options: java.util.Map[String, String]): Column = { + withExpr(XmlToStructs(CharVarcharUtils.failIfHasCharVarchar(schema), + options.asScala.toMap, e.expr)) + } + + // scalastyle:off line.size.limit + /** + * (Java-specific) Parses a column containing a XML string into a `StructType` + * with the specified schema. + * Returns `null`, in the case of an unparseable string. + * + * @param e a string column containing XML data. + * @param schema the schema as a DDL-formatted string. + * @param options options to control how the XML is parsed. accepts the same options and the + * xml data source. + * See + * <a href= + * "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> + * Data Source Option</a> in the version you use. + * @group collection_funcs + * @since 4.0.0 + */ + // scalastyle:on line.size.limit + def from_xml(e: Column, schema: String, options: java.util.Map[String, String]): Column = { + val dataType = parseTypeWithFallback( + schema, + DataType.fromJson, + fallbackParser = DataType.fromDDL) + val structType = dataType match { + case t: StructType => t + case _ => throw DataTypeErrors.failedParsingStructTypeError(schema) + } + from_xml(e, structType, options) } // scalastyle:off line.size.limit + /** + * (Java-specific) Parses a column containing a XML string into a `StructType` + * with the specified schema. Returns `null`, in the case of an unparseable string. + * + * @param e a string column containing XML data. + * @param schema the schema to use when parsing the XML string + * @group collection_funcs + * @since 4.0.0 + */ + // scalastyle:on line.size.limit + def from_xml(e: Column, schema: Column): Column = { + from_xml(e, schema, Map.empty[String, String].asJava) + } + + // scalastyle:off line.size.limit /** * (Java-specific) Parses a column containing a XML string into a `StructType` * with the specified schema. Returns `null`, in the case of an unparseable string. @@ -7403,7 +7449,7 @@ object functions { * "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> * Data Source Option</a> in the version you use. * @group collection_funcs - * @since + * @since 4.0.0 */ // scalastyle:on line.size.limit def from_xml(e: Column, schema: Column, options: java.util.Map[String, String]): Column = { @@ -7419,10 +7465,10 @@ object functions { * @param schema the schema to use when parsing the XML string * @group collection_funcs - * @since + * @since 4.0.0 */ def from_xml(e: Column, schema: StructType): Column = - from_xml(e, schema, Map.empty[String, String]) + from_xml(e, schema, Map.empty[String, String].asJava) /** * Parses a XML string and infers its schema in DDL format. diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md index 9e06d5ac58a..d21ceaeb14b 100644 --- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md +++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md @@ -426,6 +426,7 @@ | org.apache.spark.sql.catalyst.expressions.aggregate.VariancePop | var_pop | SELECT var_pop(col) FROM VALUES (1), (2), (3) AS tab(col) | struct<var_pop(col):double> | | org.apache.spark.sql.catalyst.expressions.aggregate.VarianceSamp | var_samp | SELECT var_samp(col) FROM VALUES (1), (2), (3) AS tab(col) | struct<var_samp(col):double> | | org.apache.spark.sql.catalyst.expressions.aggregate.VarianceSamp | variance | SELECT variance(col) FROM VALUES (1), (2), (3) AS tab(col) | struct<variance(col):double> | +| org.apache.spark.sql.catalyst.expressions.xml.SchemaOfXml | schema_of_xml | SELECT schema_of_xml('<p><a>1</a></p>') | struct<schema_of_xml(<p><a>1</a></p>):string> | | org.apache.spark.sql.catalyst.expressions.xml.XPathBoolean | xpath_boolean | SELECT xpath_boolean('<a><b>1</b></a>','a/b') | struct<xpath_boolean(<a><b>1</b></a>, a/b):boolean> | | org.apache.spark.sql.catalyst.expressions.xml.XPathDouble | xpath_double | SELECT xpath_double('<a><b>1</b><b>2</b></a>', 'sum(a/b)') | struct<xpath_double(<a><b>1</b><b>2</b></a>, sum(a/b)):double> | | org.apache.spark.sql.catalyst.expressions.xml.XPathDouble | xpath_number | SELECT xpath_number('<a><b>1</b><b>2</b></a>', 'sum(a/b)') | struct<xpath_number(<a><b>1</b><b>2</b></a>, sum(a/b)):double> | @@ -434,4 +435,5 @@ | org.apache.spark.sql.catalyst.expressions.xml.XPathList | xpath | SELECT xpath('<a><b>b1</b><b>b2</b><b>b3</b><c>c1</c><c>c2</c></a>','a/b/text()') | struct<xpath(<a><b>b1</b><b>b2</b><b>b3</b><c>c1</c><c>c2</c></a>, a/b/text()):array<string>> | | org.apache.spark.sql.catalyst.expressions.xml.XPathLong | xpath_long | SELECT xpath_long('<a><b>1</b><b>2</b></a>', 'sum(a/b)') | struct<xpath_long(<a><b>1</b><b>2</b></a>, sum(a/b)):bigint> | | org.apache.spark.sql.catalyst.expressions.xml.XPathShort | xpath_short | SELECT xpath_short('<a><b>1</b><b>2</b></a>', 'sum(a/b)') | struct<xpath_short(<a><b>1</b><b>2</b></a>, sum(a/b)):smallint> | -| org.apache.spark.sql.catalyst.expressions.xml.XPathString | xpath_string | SELECT xpath_string('<a><b>b</b><c>cc</c></a>','a/c') | struct<xpath_string(<a><b>b</b><c>cc</c></a>, a/c):string> | \ No newline at end of file +| org.apache.spark.sql.catalyst.expressions.xml.XPathString | xpath_string | SELECT xpath_string('<a><b>b</b><c>cc</c></a>','a/c') | struct<xpath_string(<a><b>b</b><c>cc</c></a>, a/c):string> | +| org.apache.spark.sql.catalyst.expressions.xml.XmlToStructs | from_xml | SELECT from_xml('<p><a>1</a><b>0.8</b></p>', 'a INT, b DOUBLE') | struct<from_xml(<p><a>1</a><b>0.8</b></p>):struct<a:int,b:double>> | \ No newline at end of file diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/xml-functions.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/xml-functions.sql.out new file mode 100644 index 00000000000..e62f4aab344 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/xml-functions.sql.out @@ -0,0 +1,271 @@ +-- Automatically generated by SQLQueryTestSuite +-- !query +select from_xml('<p><a>1</a></p>', 'a INT') +-- !query analysis +Project [from_xml(StructField(a,IntegerType,true), <p><a>1</a></p>, Some(America/Los_Angeles)) AS from_xml(<p><a>1</a></p>)#x] ++- OneRowRelation + + +-- !query +select from_xml('<p><time>26/08/2015</time></p>', 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy')) +-- !query analysis +Project [from_xml(StructField(time,TimestampType,true), (timestampFormat,dd/MM/yyyy), <p><time>26/08/2015</time></p>, Some(America/Los_Angeles)) AS from_xml(<p><time>26/08/2015</time></p>)#x] ++- OneRowRelation + + +-- !query +select from_xml('<p><a>1</a></p>', 1) +-- !query analysis +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "INVALID_SCHEMA.NON_STRING_LITERAL", + "sqlState" : "42K07", + "messageParameters" : { + "inputSchema" : "\"1\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 37, + "fragment" : "from_xml('<p><a>1</a></p>', 1)" + } ] +} + + +-- !query +select from_xml('<p><a>1</a></p>', 'a InvalidType') +-- !query analysis +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "PARSE_SYNTAX_ERROR", + "sqlState" : "42601", + "messageParameters" : { + "error" : "'InvalidType'", + "hint" : ": extra input 'InvalidType'" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 51, + "fragment" : "from_xml('<p><a>1</a></p>', 'a InvalidType')" + } ] +} + + +-- !query +select from_xml('<p><a>1</a></p>', 'a INT', named_struct('mode', 'PERMISSIVE')) +-- !query analysis +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "INVALID_OPTIONS.NON_MAP_FUNCTION", + "sqlState" : "42K06", + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 79, + "fragment" : "from_xml('<p><a>1</a></p>', 'a INT', named_struct('mode', 'PERMISSIVE'))" + } ] +} + + +-- !query +select from_xml('<p><a>1</a></p>', 'a INT', map('mode', 1)) +-- !query analysis +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "INVALID_OPTIONS.NON_STRING_TYPE", + "sqlState" : "42K06", + "messageParameters" : { + "mapType" : "\"MAP<STRING, INT>\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 59, + "fragment" : "from_xml('<p><a>1</a></p>', 'a INT', map('mode', 1))" + } ] +} + + +-- !query +select from_xml() +-- !query analysis +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "WRONG_NUM_ARGS.WITHOUT_SUGGESTION", + "sqlState" : "42605", + "messageParameters" : { + "actualNum" : "0", + "docroot" : "https://spark.apache.org/docs/latest", + "expectedNum" : "[2, 3]", + "functionName" : "`from_xml`" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 17, + "fragment" : "from_xml()" + } ] +} + + +-- !query +DROP VIEW IF EXISTS xmlTable +-- !query analysis +DropTableCommand `spark_catalog`.`default`.`xmlTable`, true, true, false + + +-- !query +select from_xml('<p><a>1</a></p>', 'struct<a:array<int>>') +-- !query analysis +Project [from_xml(StructField(a,ArrayType(IntegerType,true),true), <p><a>1</a></p>, Some(America/Los_Angeles)) AS from_xml(<p><a>1</a></p>)#x] ++- OneRowRelation + + +-- !query +select from_xml('<p><a>1</a><b>"2"</b></p>', 'struct<a:int,b:string>') +-- !query analysis +Project [from_xml(StructField(a,IntegerType,true), StructField(b,StringType,true), <p><a>1</a><b>"2"</b></p>, Some(America/Los_Angeles)) AS from_xml(<p><a>1</a><b>"2"</b></p>)#x] ++- OneRowRelation + + +-- !query +select schema_of_xml('<p><a>1</a><b>"2"</b></p>') +-- !query analysis +Project [schema_of_xml(<p><a>1</a><b>"2"</b></p>) AS schema_of_xml(<p><a>1</a><b>"2"</b></p>)#x] ++- OneRowRelation + + +-- !query +select from_xml('<p><a>1</a><a>2</a><a>3</a></p>', schema_of_xml('<p><a>1</a><a>2</a></p>')) +-- !query analysis +Project [from_xml(StructField(a,ArrayType(LongType,true),true), <p><a>1</a><a>2</a><a>3</a></p>, Some(America/Los_Angeles)) AS from_xml(<p><a>1</a><a>2</a><a>3</a></p>)#x] ++- OneRowRelation + + +-- !query +select from_xml('<p><a>1</a><a>2</a></p>', 'struct<a:array<int>>') +-- !query analysis +Project [from_xml(StructField(a,ArrayType(IntegerType,true),true), <p><a>1</a><a>2</a></p>, Some(America/Los_Angeles)) AS from_xml(<p><a>1</a><a>2</a></p>)#x] ++- OneRowRelation + + +-- !query +select from_xml('<p><a>1</a><a>"2"</a></p>', 'struct<a:array<int>>') +-- !query analysis +Project [from_xml(StructField(a,ArrayType(IntegerType,true),true), <p><a>1</a><a>"2"</a></p>, Some(America/Los_Angeles)) AS from_xml(<p><a>1</a><a>"2"</a></p>)#x] ++- OneRowRelation + + +-- !query +select from_xml('<p><a>1</a><a></a></p>', 'struct<a:array<int>>') +-- !query analysis +Project [from_xml(StructField(a,ArrayType(IntegerType,true),true), <p><a>1</a><a></a></p>, Some(America/Los_Angeles)) AS from_xml(<p><a>1</a><a></a></p>)#x] ++- OneRowRelation + + +-- !query +select from_xml('<p><a attr="1"><b>2</b></a></p>', 'struct<a:map<string,int>>') +-- !query analysis +Project [from_xml(StructField(a,MapType(StringType,IntegerType,true),true), <p><a attr="1"><b>2</b></a></p>, Some(America/Los_Angeles)) AS from_xml(<p><a attr="1"><b>2</b></a></p>)#x] ++- OneRowRelation + + +-- !query +select from_xml('<p><d>2012-12-15</d><t>2012-12-15 15:15:15</t></p>', 'd date, t timestamp') +-- !query analysis +Project [from_xml(StructField(d,DateType,true), StructField(t,TimestampType,true), <p><d>2012-12-15</d><t>2012-12-15 15:15:15</t></p>, Some(America/Los_Angeles)) AS from_xml(<p><d>2012-12-15</d><t>2012-12-15 15:15:15</t></p>)#x] ++- OneRowRelation + + +-- !query +select from_xml( + '<p><d>12/15 2012</d><t>12/15 2012 15:15:15</t>}</p>', + 'd date, t timestamp', + map('dateFormat', 'MM/dd yyyy', 'timestampFormat', 'MM/dd yyyy HH:mm:ss')) +-- !query analysis +Project [from_xml(StructField(d,DateType,true), StructField(t,TimestampType,true), (dateFormat,MM/dd yyyy), (timestampFormat,MM/dd yyyy HH:mm:ss), <p><d>12/15 2012</d><t>12/15 2012 15:15:15</t>}</p>, Some(America/Los_Angeles)) AS from_xml(<p><d>12/15 2012</d><t>12/15 2012 15:15:15</t>}</p>)#x] ++- OneRowRelation + + +-- !query +select from_xml( + '<p><d>02-29</d></p>', + 'd date', + map('dateFormat', 'MM-dd')) +-- !query analysis +Project [from_xml(StructField(d,DateType,true), (dateFormat,MM-dd), <p><d>02-29</d></p>, Some(America/Los_Angeles)) AS from_xml(<p><d>02-29</d></p>)#x] ++- OneRowRelation + + +-- !query +select from_xml( + '<p><t>02-29</t></p>', + 't timestamp', + map('timestampFormat', 'MM-dd')) +-- !query analysis +Project [from_xml(StructField(t,TimestampType,true), (timestampFormat,MM-dd), <p><t>02-29</t></p>, Some(America/Los_Angeles)) AS from_xml(<p><t>02-29</t></p>)#x] ++- OneRowRelation + + +-- !query +select schema_of_xml(null) +-- !query analysis +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_NULL", + "sqlState" : "42K09", + "messageParameters" : { + "exprName" : "xml", + "sqlExpr" : "\"schema_of_xml(NULL)\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 26, + "fragment" : "schema_of_xml(null)" + } ] +} + + +-- !query +CREATE TEMPORARY VIEW xmlTable(xmlField, a) AS SELECT * FROM VALUES ('<p><a>1</a><b>"2"</b></p>', 'a') +-- !query analysis +CreateViewCommand `xmlTable`, [(xmlField,None), (a,None)], SELECT * FROM VALUES ('<p><a>1</a><b>"2"</b></p>', 'a'), false, false, LocalTempView, true + +- Project [col1#x, col2#x] + +- LocalRelation [col1#x, col2#x] + + +-- !query +SELECT schema_of_xml(xmlField) FROM xmlTable +-- !query analysis +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.NON_FOLDABLE_INPUT", + "sqlState" : "42K09", + "messageParameters" : { + "inputExpr" : "\"xmlField\"", + "inputName" : "`xml`", + "inputType" : "\"STRING\"", + "sqlExpr" : "\"schema_of_xml(xmlField)\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 30, + "fragment" : "schema_of_xml(xmlField)" + } ] +} + + +-- !query +DROP VIEW IF EXISTS xmlTable +-- !query analysis +DropTempViewCommand xmlTable diff --git a/sql/core/src/test/resources/sql-tests/inputs/xml-functions.sql b/sql/core/src/test/resources/sql-tests/inputs/xml-functions.sql new file mode 100644 index 00000000000..cdf56712b11 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/xml-functions.sql @@ -0,0 +1,50 @@ +-- from_json +select from_xml('<p><a>1</a></p>', 'a INT'); +select from_xml('<p><time>26/08/2015</time></p>', 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy')); +-- Check if errors handled +select from_xml('<p><a>1</a></p>', 1); +select from_xml('<p><a>1</a></p>', 'a InvalidType'); +select from_xml('<p><a>1</a></p>', 'a INT', named_struct('mode', 'PERMISSIVE')); +select from_xml('<p><a>1</a></p>', 'a INT', map('mode', 1)); +select from_xml(); + +-- Clean up +DROP VIEW IF EXISTS xmlTable; + +-- from_json - complex types +select from_xml('<p><a>1</a></p>', 'struct<a:array<int>>'); +select from_xml('<p><a>1</a><b>"2"</b></p>', 'struct<a:int,b:string>'); + +-- infer schema of json literal +select schema_of_xml('<p><a>1</a><b>"2"</b></p>'); +select from_xml('<p><a>1</a><a>2</a><a>3</a></p>', schema_of_xml('<p><a>1</a><a>2</a></p>')); + +-- from_json - array type +select from_xml('<p><a>1</a><a>2</a></p>', 'struct<a:array<int>>'); +select from_xml('<p><a>1</a><a>"2"</a></p>', 'struct<a:array<int>>'); +select from_xml('<p><a>1</a><a></a></p>', 'struct<a:array<int>>'); + +select from_xml('<p><a attr="1"><b>2</b></a></p>', 'struct<a:map<string,int>>'); + +-- from_xml - datetime type +select from_xml('<p><d>2012-12-15</d><t>2012-12-15 15:15:15</t></p>', 'd date, t timestamp'); +select from_xml( + '<p><d>12/15 2012</d><t>12/15 2012 15:15:15</t>}</p>', + 'd date, t timestamp', + map('dateFormat', 'MM/dd yyyy', 'timestampFormat', 'MM/dd yyyy HH:mm:ss')); +select from_xml( + '<p><d>02-29</d></p>', + 'd date', + map('dateFormat', 'MM-dd')); +select from_xml( + '<p><t>02-29</t></p>', + 't timestamp', + map('timestampFormat', 'MM-dd')); + +-- infer schema of xml literal with options +select schema_of_xml(null); +CREATE TEMPORARY VIEW xmlTable(xmlField, a) AS SELECT * FROM VALUES ('<p><a>1</a><b>"2"</b></p>', 'a'); +SELECT schema_of_xml(xmlField) FROM xmlTable; + +-- Clean up +DROP VIEW IF EXISTS xmlTable; diff --git a/sql/core/src/test/resources/sql-tests/results/xml-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/xml-functions.sql.out new file mode 100644 index 00000000000..61e8e9c8662 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/xml-functions.sql.out @@ -0,0 +1,319 @@ +-- Automatically generated by SQLQueryTestSuite +-- !query +select from_xml('<p><a>1</a></p>', 'a INT') +-- !query schema +struct<from_xml(<p><a>1</a></p>):struct<a:int>> +-- !query output +{"a":1} + + +-- !query +select from_xml('<p><time>26/08/2015</time></p>', 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy')) +-- !query schema +struct<from_xml(<p><time>26/08/2015</time></p>):struct<time:timestamp>> +-- !query output +{"time":2015-08-26 00:00:00} + + +-- !query +select from_xml('<p><a>1</a></p>', 1) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "INVALID_SCHEMA.NON_STRING_LITERAL", + "sqlState" : "42K07", + "messageParameters" : { + "inputSchema" : "\"1\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 37, + "fragment" : "from_xml('<p><a>1</a></p>', 1)" + } ] +} + + +-- !query +select from_xml('<p><a>1</a></p>', 'a InvalidType') +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "PARSE_SYNTAX_ERROR", + "sqlState" : "42601", + "messageParameters" : { + "error" : "'InvalidType'", + "hint" : ": extra input 'InvalidType'" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 51, + "fragment" : "from_xml('<p><a>1</a></p>', 'a InvalidType')" + } ] +} + + +-- !query +select from_xml('<p><a>1</a></p>', 'a INT', named_struct('mode', 'PERMISSIVE')) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "INVALID_OPTIONS.NON_MAP_FUNCTION", + "sqlState" : "42K06", + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 79, + "fragment" : "from_xml('<p><a>1</a></p>', 'a INT', named_struct('mode', 'PERMISSIVE'))" + } ] +} + + +-- !query +select from_xml('<p><a>1</a></p>', 'a INT', map('mode', 1)) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "INVALID_OPTIONS.NON_STRING_TYPE", + "sqlState" : "42K06", + "messageParameters" : { + "mapType" : "\"MAP<STRING, INT>\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 59, + "fragment" : "from_xml('<p><a>1</a></p>', 'a INT', map('mode', 1))" + } ] +} + + +-- !query +select from_xml() +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "WRONG_NUM_ARGS.WITHOUT_SUGGESTION", + "sqlState" : "42605", + "messageParameters" : { + "actualNum" : "0", + "docroot" : "https://spark.apache.org/docs/latest", + "expectedNum" : "[2, 3]", + "functionName" : "`from_xml`" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 17, + "fragment" : "from_xml()" + } ] +} + + +-- !query +DROP VIEW IF EXISTS xmlTable +-- !query schema +struct<> +-- !query output + + + +-- !query +select from_xml('<p><a>1</a></p>', 'struct<a:array<int>>') +-- !query schema +struct<from_xml(<p><a>1</a></p>):struct<a:array<int>>> +-- !query output +{"a":[1]} + + +-- !query +select from_xml('<p><a>1</a><b>"2"</b></p>', 'struct<a:int,b:string>') +-- !query schema +struct<from_xml(<p><a>1</a><b>"2"</b></p>):struct<a:int,b:string>> +-- !query output +{"a":1,"b":""2""} + + +-- !query +select schema_of_xml('<p><a>1</a><b>"2"</b></p>') +-- !query schema +struct<schema_of_xml(<p><a>1</a><b>"2"</b></p>):string> +-- !query output +STRUCT<a: BIGINT, b: STRING> + + +-- !query +select from_xml('<p><a>1</a><a>2</a><a>3</a></p>', schema_of_xml('<p><a>1</a><a>2</a></p>')) +-- !query schema +struct<from_xml(<p><a>1</a><a>2</a><a>3</a></p>):struct<a:array<bigint>>> +-- !query output +{"a":[1,2,3]} + + +-- !query +select from_xml('<p><a>1</a><a>2</a></p>', 'struct<a:array<int>>') +-- !query schema +struct<from_xml(<p><a>1</a><a>2</a></p>):struct<a:array<int>>> +-- !query output +{"a":[1,2]} + + +-- !query +select from_xml('<p><a>1</a><a>"2"</a></p>', 'struct<a:array<int>>') +-- !query schema +struct<from_xml(<p><a>1</a><a>"2"</a></p>):struct<a:array<int>>> +-- !query output +{"a":[1]} + + +-- !query +select from_xml('<p><a>1</a><a></a></p>', 'struct<a:array<int>>') +-- !query schema +struct<from_xml(<p><a>1</a><a></a></p>):struct<a:array<int>>> +-- !query output +{"a":[1,null]} + + +-- !query +select from_xml('<p><a attr="1"><b>2</b></a></p>', 'struct<a:map<string,int>>') +-- !query schema +struct<from_xml(<p><a attr="1"><b>2</b></a></p>):struct<a:map<string,int>>> +-- !query output +{"a":{"_attr":1,"b":2}} + + +-- !query +select from_xml('<p><d>2012-12-15</d><t>2012-12-15 15:15:15</t></p>', 'd date, t timestamp') +-- !query schema +struct<from_xml(<p><d>2012-12-15</d><t>2012-12-15 15:15:15</t></p>):struct<d:date,t:timestamp>> +-- !query output +{"d":2012-12-15,"t":2012-12-15 15:15:15} + + +-- !query +select from_xml( + '<p><d>12/15 2012</d><t>12/15 2012 15:15:15</t>}</p>', + 'd date, t timestamp', + map('dateFormat', 'MM/dd yyyy', 'timestampFormat', 'MM/dd yyyy HH:mm:ss')) +-- !query schema +struct<from_xml(<p><d>12/15 2012</d><t>12/15 2012 15:15:15</t>}</p>):struct<d:date,t:timestamp>> +-- !query output +{"d":2012-12-15,"t":2012-12-15 15:15:15} + + +-- !query +select from_xml( + '<p><d>02-29</d></p>', + 'd date', + map('dateFormat', 'MM-dd')) +-- !query schema +struct<> +-- !query output +org.apache.spark.SparkUpgradeException +{ + "errorClass" : "INCONSISTENT_BEHAVIOR_CROSS_VERSION.PARSE_DATETIME_BY_NEW_PARSER", + "sqlState" : "42K0B", + "messageParameters" : { + "config" : "\"spark.sql.legacy.timeParserPolicy\"", + "datetime" : "'02-29'" + } +} + + +-- !query +select from_xml( + '<p><t>02-29</t></p>', + 't timestamp', + map('timestampFormat', 'MM-dd')) +-- !query schema +struct<> +-- !query output +org.apache.spark.SparkUpgradeException +{ + "errorClass" : "INCONSISTENT_BEHAVIOR_CROSS_VERSION.PARSE_DATETIME_BY_NEW_PARSER", + "sqlState" : "42K0B", + "messageParameters" : { + "config" : "\"spark.sql.legacy.timeParserPolicy\"", + "datetime" : "'02-29'" + } +} + + +-- !query +select schema_of_xml(null) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_NULL", + "sqlState" : "42K09", + "messageParameters" : { + "exprName" : "xml", + "sqlExpr" : "\"schema_of_xml(NULL)\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 26, + "fragment" : "schema_of_xml(null)" + } ] +} + + +-- !query +CREATE TEMPORARY VIEW xmlTable(xmlField, a) AS SELECT * FROM VALUES ('<p><a>1</a><b>"2"</b></p>', 'a') +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT schema_of_xml(xmlField) FROM xmlTable +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.NON_FOLDABLE_INPUT", + "sqlState" : "42K09", + "messageParameters" : { + "inputExpr" : "\"xmlField\"", + "inputName" : "`xml`", + "inputType" : "\"STRING\"", + "sqlExpr" : "\"schema_of_xml(xmlField)\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 30, + "fragment" : "schema_of_xml(xmlField)" + } ] +} + + +-- !query +DROP VIEW IF EXISTS xmlTable +-- !query schema +struct<> +-- !query output + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala index 8ca14385e59..7044b7c90c2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala @@ -82,10 +82,7 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession { "bucket", "days", "hours", "months", "years", // Datasource v2 partition transformations "product", // Discussed in https://github.com/apache/spark/pull/30745 "unwrap_udt", - "collect_top_k", - // TODO: XML functions will soon be added to SQL Function registry and removed from this list - // https://issues.apache.org/jira/browse/SPARK-44787 - "from_xml", "schema_of_xml" + "collect_top_k" ) // We only consider functions matching this pattern, this excludes symbolic and other diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala index beea14e3cf8..b03d65219d7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala @@ -1264,7 +1264,7 @@ class XmlSuite extends QueryTest with SharedSparkSession { val outputDF = Seq("0.0000", "0.01") .map { n => s"<Row> <Number>$n</Number> </Row>" } .toDF("xml") - .withColumn("parsed", from_xml($"xml", schema, Map("rowTag" -> "Row"))) + .withColumn("parsed", from_xml($"xml", schema, Map("rowTag" -> "Row").asJava)) .select("parsed.Number") val results = outputDF.collect() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org