This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 4da9348160f [SPARK-43938][CONNECT][PYTHON] Add to_* functions to Scala and Python 4da9348160f is described below commit 4da9348160f522d6a5e7633a170d8d077100657f Author: panbingkun <pbk1...@gmail.com> AuthorDate: Mon Jun 12 08:50:59 2023 +0800 [SPARK-43938][CONNECT][PYTHON] Add to_* functions to Scala and Python ### What changes were proposed in this pull request? Add following functions: - str_to_map - to_binary - to_char - to_number - to_timestamp_ltz - to_timestamp_ntz - to_unix_timestamp to: - Scala API - Python API - Spark Connect Scala Client - Spark Connect Python Client ### Why are the changes needed? for parity ### Does this PR introduce _any_ user-facing change? Yes, new functions. ### How was this patch tested? - Add New UT. Closes #41505 from panbingkun/SPARK-43938. Lead-authored-by: panbingkun <pbk1...@gmail.com> Co-authored-by: panbingkun <84731...@qq.com> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- .../scala/org/apache/spark/sql/functions.scala | 160 +++++++++++++ .../apache/spark/sql/PlanGenerationTestSuite.scala | 52 ++++ .../explain-results/function_str_to_map.explain | 2 + ...to_map_with_pair_and_keyValue_delimiter.explain | 2 + ...function_str_to_map_with_pair_delimiter.explain | 2 + .../explain-results/function_to_binary.explain | 2 + .../function_to_binary_with_format.explain | 2 + .../explain-results/function_to_char.explain | 2 + .../explain-results/function_to_number.explain | 2 + .../function_to_timestamp_ltz.explain | 2 + .../function_to_timestamp_ltz_with_format.explain | 2 + .../function_to_timestamp_ntz.explain | 2 + .../function_to_timestamp_ntz_with_format.explain | 2 + .../function_to_unix_timestamp.explain | 2 + .../function_to_unix_timestamp_with_format.explain | 2 + .../query-tests/queries/function_str_to_map.json | 25 ++ .../queries/function_str_to_map.proto.bin | Bin 0 -> 179 bytes ...tr_to_map_with_pair_and_keyValue_delimiter.json | 29 +++ ..._map_with_pair_and_keyValue_delimiter.proto.bin | Bin 0 -> 186 bytes .../function_str_to_map_with_pair_delimiter.json | 33 +++ ...nction_str_to_map_with_pair_delimiter.proto.bin | Bin 0 -> 193 bytes .../query-tests/queries/function_to_binary.json | 25 ++ .../queries/function_to_binary.proto.bin | Bin 0 -> 178 bytes .../queries/function_to_binary_with_format.json | 29 +++ .../function_to_binary_with_format.proto.bin | Bin 0 -> 189 bytes .../query-tests/queries/function_to_char.json | 29 +++ .../query-tests/queries/function_to_char.proto.bin | Bin 0 -> 188 bytes .../query-tests/queries/function_to_number.json | 29 +++ .../queries/function_to_number.proto.bin | Bin 0 -> 188 bytes .../queries/function_to_timestamp_ltz.json | 25 ++ .../queries/function_to_timestamp_ltz.proto.bin | Bin 0 -> 185 bytes .../function_to_timestamp_ltz_with_format.json | 29 +++ ...function_to_timestamp_ltz_with_format.proto.bin | Bin 0 -> 192 bytes .../queries/function_to_timestamp_ntz.json | 25 ++ .../queries/function_to_timestamp_ntz.proto.bin | Bin 0 -> 185 bytes .../function_to_timestamp_ntz_with_format.json | 29 +++ ...function_to_timestamp_ntz_with_format.proto.bin | Bin 0 -> 192 bytes .../queries/function_to_unix_timestamp.json | 25 ++ .../queries/function_to_unix_timestamp.proto.bin | Bin 0 -> 186 bytes .../function_to_unix_timestamp_with_format.json | 29 +++ ...unction_to_unix_timestamp_with_format.proto.bin | Bin 0 -> 193 bytes .../source/reference/pyspark.sql/functions.rst | 7 + python/pyspark/sql/connect/functions.py | 77 ++++++ python/pyspark/sql/functions.py | 264 +++++++++++++++++++++ .../scala/org/apache/spark/sql/functions.scala | 175 ++++++++++++++ .../org/apache/spark/sql/DateFunctionsSuite.scala | 35 +++ .../apache/spark/sql/StringFunctionsSuite.scala | 61 +++++ 47 files changed, 1218 insertions(+) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala index fc9eb074ca9..9c26037df84 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala @@ -1396,6 +1396,34 @@ object functions { def map_from_arrays(keys: Column, values: Column): Column = Column.fn("map_from_arrays", keys, values) + /** + * Creates a map after splitting the text into key/value pairs using delimiters. Both + * `pairDelim` and `keyValueDelim` are treated as regular expressions. + * + * @group map_funcs + * @since 3.5.0 + */ + def str_to_map(text: Column, pairDelim: Column, keyValueDelim: Column): Column = + Column.fn("str_to_map", text, pairDelim, keyValueDelim) + + /** + * Creates a map after splitting the text into key/value pairs using delimiters. The `pairDelim` + * is treated as regular expressions. + * + * @group map_funcs + * @since 3.5.0 + */ + def str_to_map(text: Column, pairDelim: Column): Column = + Column.fn("str_to_map", text, pairDelim) + + /** + * Creates a map after splitting the text into key/value pairs using delimiters. + * + * @group map_funcs + * @since 3.5.0 + */ + def str_to_map(text: Column): Column = Column.fn("str_to_map", text) + /** * Marks a DataFrame as small enough for use in broadcast joins. * @@ -3393,6 +3421,80 @@ object functions { */ def upper(e: Column): Column = Column.fn("upper", e) + /** + * Converts the input `e` to a binary value based on the supplied `f`. The `f` can be a + * case-insensitive string literal of "hex", "utf-8", "utf8", or "base64". By default, the + * binary format for conversion is "hex" if `fmt` is omitted. The function returns NULL if at + * least one of the input parameters is NULL. + * + * @group string_funcs + * @since 3.5.0 + */ + def to_binary(e: Column, f: Column): Column = Column.fn("to_binary", e, f) + + /** + * Converts the input `e` to a binary value based on the format "hex". The function returns NULL + * if at least one of the input parameters is NULL. + * + * @group string_funcs + * @since 3.5.0 + */ + def to_binary(e: Column): Column = Column.fn("to_binary", e) + + /** + * Convert `e` to a string based on the `format`. Throws an exception if the conversion fails. + * + * @param e + * A column of number to be converted + * @param format + * The format can consist of the following characters, case insensitive: <ul> <li> '0' or '9': + * Specifies an expected digit between 0 and 9. A sequence of 0 or 9 in the format string + * matches a sequence of digits in the input value, generating a result string of the same + * length as the corresponding sequence in the format string. The result string is left-padded + * with zeros if the 0/9 sequence comprises more digits than the matching part of the decimal + * value, starts with 0, and is before the decimal point. Otherwise, it is padded with + * spaces.</li> <li>'.' or 'D': Specifies the position of the decimal point (optional, only + * allowed once).</li> <li>',' or 'G': Specifies the position of the grouping (thousands) + * separator (,). There must be a 0 or 9 to the left and right of each grouping + * separator.</li> <li>'$': Specifies the location of the $ currency sign. This character may + * only be specified once.</li> <li>'S' or 'MI': Specifies the position of a '-' or '+' sign + * (optional, only allowed once at the beginning or end of the format string). Note that 'S' + * prints '+' for positive values but 'MI' prints a space.</li> <li>'PR': Only allowed at the + * end of the format string; specifies that the result string will be wrapped by angle + * brackets if the input value is negative.</li> </ul> + * + * @group string_funcs + * @since 3.5.0 + */ + def to_char(e: Column, format: Column): Column = Column.fn("to_char", e, format) + + /** + * Convert string 'e' to a number based on the string format 'format'. Throws an exception if + * the conversion fails. + * + * @param e + * A column of string to be converted + * @param format + * The format can consist of the following characters, case insensitive: <ul><li> '0' or '9': + * Specifies an expected digit between 0 and 9. A sequence of 0 or 9 in the format string + * matches a sequence of digits in the input string. If the 0/9 sequence starts with 0 and is + * before the decimal point, it can only match a digit sequence of the same size. Otherwise, + * if the sequence starts with 9 or is after the decimal point, it can match a digit sequence + * that has the same or smaller size.</li> <li>'.' or 'D': Specifies the position of the + * decimal point (optional, only allowed once).</li> <li>',' or 'G': Specifies the position of + * the grouping (thousands) separator (,). There must be a 0 or 9 to the left and right of + * each grouping separator. 'expr' must match the grouping separator relevant for the size of + * the number.</li> <li>'$': Specifies the location of the $ currency sign. This character may + * only be specified once.</li> <li>'S' or 'MI': Specifies the position of a '-' or '+' sign + * (optional, only allowed once at the beginning or end of the format string). Note that 'S' + * allows '-' but 'MI' does not.</li> <li>'PR': Only allowed at the end of the format string; + * specifies that 'expr' indicates a negative number with wrapping angled brackets.</li></ul> + * + * @group string_funcs + * @since 3.5.0 + */ + def to_number(e: Column, format: Column): Column = Column.fn("to_number", e, format) + ////////////////////////////////////////////////////////////////////////////////////////////// // DateTime functions ////////////////////////////////////////////////////////////////////////////////////////////// @@ -4266,6 +4368,64 @@ object functions { */ def timestamp_seconds(e: Column): Column = Column.fn("timestamp_seconds", e) + /** + * Parses the `timestamp` expression with the `format` expression to a timestamp without time + * zone. Returns null with invalid input. + * + * @group datetime_funcs + * @since 3.5.0 + */ + def to_timestamp_ltz(timestamp: Column, format: Column): Column = + Column.fn("to_timestamp_ltz", timestamp, format) + + /** + * Parses the `timestamp` expression with the default format to a timestamp without time zone. + * The default format follows casting rules to a timestamp. Returns null with invalid input. + * + * @group datetime_funcs + * @since 3.5.0 + */ + def to_timestamp_ltz(timestamp: Column): Column = + Column.fn("to_timestamp_ltz", timestamp) + + /** + * Parses the `timestamp` expression with the `format` expression to a timestamp without time + * zone. Returns null with invalid input. + * + * @group datetime_funcs + * @since 3.5.0 + */ + def to_timestamp_ntz(timestamp: Column, format: Column): Column = + Column.fn("to_timestamp_ntz", timestamp, format) + + /** + * Parses the `timestamp` expression with the default format to a timestamp without time zone. + * The default format follows casting rules to a timestamp. Returns null with invalid input. + * + * @group datetime_funcs + * @since 3.5.0 + */ + def to_timestamp_ntz(timestamp: Column): Column = + Column.fn("to_timestamp_ntz", timestamp) + + /** + * Returns the UNIX timestamp of the given time. + * + * @group datetime_funcs + * @since 3.5.0 + */ + def to_unix_timestamp(timeExp: Column, format: Column): Column = + Column.fn("to_unix_timestamp", timeExp, format) + + /** + * Returns the UNIX timestamp of the given time. + * + * @group datetime_funcs + * @since 3.5.0 + */ + def to_unix_timestamp(timeExp: Column): Column = + Column.fn("to_unix_timestamp", timeExp) + ////////////////////////////////////////////////////////////////////////////////////////////// // Collection functions ////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala index 18223c56588..dba95e4da37 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala @@ -2084,6 +2084,58 @@ class PlanGenerationTestSuite fn.to_csv(fn.col("d"), Collections.singletonMap("sep", "|")) } + functionTest("str_to_map") { + fn.str_to_map(fn.col("g")) + } + + functionTest("str_to_map with pair delimiter") { + fn.str_to_map(fn.col("g"), lit(","), lit("=")) + } + + functionTest("str_to_map with pair and keyValue delimiter") { + fn.str_to_map(fn.col("g"), lit(",")) + } + + functionTest("to_binary") { + fn.to_binary(fn.col("g")) + } + + functionTest("to_binary with format") { + fn.to_binary(fn.col("g"), lit("utf-8")) + } + + functionTest("to_char") { + fn.to_char(fn.col("b"), lit("$99.99")) + } + + functionTest("to_number") { + fn.to_char(fn.col("g"), lit("$99.99")) + } + + functionTest("to_timestamp_ltz") { + fn.to_timestamp_ltz(fn.col("g")) + } + + functionTest("to_timestamp_ltz with format") { + fn.to_timestamp_ltz(fn.col("g"), fn.col("g")) + } + + functionTest("to_timestamp_ntz") { + fn.to_timestamp_ntz(fn.col("g")) + } + + functionTest("to_timestamp_ntz with format") { + fn.to_timestamp_ntz(fn.col("g"), fn.col("g")) + } + + functionTest("to_unix_timestamp") { + fn.to_unix_timestamp(fn.col("g")) + } + + functionTest("to_unix_timestamp with format") { + fn.to_unix_timestamp(fn.col("g"), fn.col("g")) + } + test("groupby agg") { simple .groupBy(Column("id")) diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_str_to_map.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_str_to_map.explain new file mode 100644 index 00000000000..be3f2a584a8 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_str_to_map.explain @@ -0,0 +1,2 @@ +Project [str_to_map(g#0, ,, :) AS str_to_map(g, ,, :)#0] ++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_str_to_map_with_pair_and_keyValue_delimiter.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_str_to_map_with_pair_and_keyValue_delimiter.explain new file mode 100644 index 00000000000..be3f2a584a8 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_str_to_map_with_pair_and_keyValue_delimiter.explain @@ -0,0 +1,2 @@ +Project [str_to_map(g#0, ,, :) AS str_to_map(g, ,, :)#0] ++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_str_to_map_with_pair_delimiter.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_str_to_map_with_pair_delimiter.explain new file mode 100644 index 00000000000..fd99a1ab536 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_str_to_map_with_pair_delimiter.explain @@ -0,0 +1,2 @@ +Project [str_to_map(g#0, ,, =) AS str_to_map(g, ,, =)#0] ++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_binary.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_binary.explain new file mode 100644 index 00000000000..e1b99057a6d --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_binary.explain @@ -0,0 +1,2 @@ +Project [unhex(g#0, true) AS to_binary(g)#0] ++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_binary_with_format.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_binary_with_format.explain new file mode 100644 index 00000000000..e9513f0103c --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_binary_with_format.explain @@ -0,0 +1,2 @@ +Project [encode(g#0, UTF-8) AS to_binary(g, utf-8)#0] ++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_char.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_char.explain new file mode 100644 index 00000000000..f0d9cacc61a --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_char.explain @@ -0,0 +1,2 @@ +Project [to_char(cast(b#0 as decimal(30,15)), $99.99) AS to_char(b, $99.99)#0] ++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_number.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_number.explain new file mode 100644 index 00000000000..79ece963928 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_number.explain @@ -0,0 +1,2 @@ +Project [to_char(cast(g#0 as decimal(38,18)), $99.99) AS to_char(g, $99.99)#0] ++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_timestamp_ltz.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_timestamp_ltz.explain new file mode 100644 index 00000000000..18f838ee0cc --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_timestamp_ltz.explain @@ -0,0 +1,2 @@ +Project [cast(g#0 as timestamp) AS to_timestamp_ltz(g)#0] ++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_timestamp_ltz_with_format.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_timestamp_ltz_with_format.explain new file mode 100644 index 00000000000..e212c8d51a6 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_timestamp_ltz_with_format.explain @@ -0,0 +1,2 @@ +Project [gettimestamp(g#0, g#0, TimestampType, Some(America/Los_Angeles), false) AS to_timestamp_ltz(g, g)#0] ++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_timestamp_ntz.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_timestamp_ntz.explain new file mode 100644 index 00000000000..8a211403445 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_timestamp_ntz.explain @@ -0,0 +1,2 @@ +Project [cast(g#0 as timestamp_ntz) AS to_timestamp_ntz(g)#0] ++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_timestamp_ntz_with_format.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_timestamp_ntz_with_format.explain new file mode 100644 index 00000000000..10ca240877f --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_timestamp_ntz_with_format.explain @@ -0,0 +1,2 @@ +Project [gettimestamp(g#0, g#0, TimestampNTZType, Some(America/Los_Angeles), false) AS to_timestamp_ntz(g, g)#0] ++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_unix_timestamp.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_unix_timestamp.explain new file mode 100644 index 00000000000..7e2acf52436 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_unix_timestamp.explain @@ -0,0 +1,2 @@ +Project [to_unix_timestamp(g#0, yyyy-MM-dd HH:mm:ss, Some(America/Los_Angeles), false) AS to_unix_timestamp(g, yyyy-MM-dd HH:mm:ss)#0L] ++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_unix_timestamp_with_format.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_unix_timestamp_with_format.explain new file mode 100644 index 00000000000..aa3007e5fc6 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_unix_timestamp_with_format.explain @@ -0,0 +1,2 @@ +Project [to_unix_timestamp(g#0, g#0, Some(America/Los_Angeles), false) AS to_unix_timestamp(g, g)#0L] ++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_str_to_map.json b/connector/connect/common/src/test/resources/query-tests/queries/function_str_to_map.json new file mode 100644 index 00000000000..2cfd095f8fe --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_str_to_map.json @@ -0,0 +1,25 @@ +{ + "common": { + "planId": "1" + }, + "project": { + "input": { + "common": { + "planId": "0" + }, + "localRelation": { + "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e" + } + }, + "expressions": [{ + "unresolvedFunction": { + "functionName": "str_to_map", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "g" + } + }] + } + }] + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_str_to_map.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_str_to_map.proto.bin new file mode 100644 index 00000000000..9732a829513 Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_str_to_map.proto.bin differ diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_str_to_map_with_pair_and_keyValue_delimiter.json b/connector/connect/common/src/test/resources/query-tests/queries/function_str_to_map_with_pair_and_keyValue_delimiter.json new file mode 100644 index 00000000000..228c939a43e --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_str_to_map_with_pair_and_keyValue_delimiter.json @@ -0,0 +1,29 @@ +{ + "common": { + "planId": "1" + }, + "project": { + "input": { + "common": { + "planId": "0" + }, + "localRelation": { + "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e" + } + }, + "expressions": [{ + "unresolvedFunction": { + "functionName": "str_to_map", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "g" + } + }, { + "literal": { + "string": "," + } + }] + } + }] + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_str_to_map_with_pair_and_keyValue_delimiter.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_str_to_map_with_pair_and_keyValue_delimiter.proto.bin new file mode 100644 index 00000000000..069c15db9af Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_str_to_map_with_pair_and_keyValue_delimiter.proto.bin differ diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_str_to_map_with_pair_delimiter.json b/connector/connect/common/src/test/resources/query-tests/queries/function_str_to_map_with_pair_delimiter.json new file mode 100644 index 00000000000..7e02c7f13d2 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_str_to_map_with_pair_delimiter.json @@ -0,0 +1,33 @@ +{ + "common": { + "planId": "1" + }, + "project": { + "input": { + "common": { + "planId": "0" + }, + "localRelation": { + "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e" + } + }, + "expressions": [{ + "unresolvedFunction": { + "functionName": "str_to_map", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "g" + } + }, { + "literal": { + "string": "," + } + }, { + "literal": { + "string": "\u003d" + } + }] + } + }] + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_str_to_map_with_pair_delimiter.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_str_to_map_with_pair_delimiter.proto.bin new file mode 100644 index 00000000000..86a9d15b651 Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_str_to_map_with_pair_delimiter.proto.bin differ diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_to_binary.json b/connector/connect/common/src/test/resources/query-tests/queries/function_to_binary.json new file mode 100644 index 00000000000..156c3a5b3ca --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_to_binary.json @@ -0,0 +1,25 @@ +{ + "common": { + "planId": "1" + }, + "project": { + "input": { + "common": { + "planId": "0" + }, + "localRelation": { + "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e" + } + }, + "expressions": [{ + "unresolvedFunction": { + "functionName": "to_binary", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "g" + } + }] + } + }] + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_to_binary.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_to_binary.proto.bin new file mode 100644 index 00000000000..a1da0e6e2ed Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_to_binary.proto.bin differ diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_to_binary_with_format.json b/connector/connect/common/src/test/resources/query-tests/queries/function_to_binary_with_format.json new file mode 100644 index 00000000000..8c78cc6f8b9 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_to_binary_with_format.json @@ -0,0 +1,29 @@ +{ + "common": { + "planId": "1" + }, + "project": { + "input": { + "common": { + "planId": "0" + }, + "localRelation": { + "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e" + } + }, + "expressions": [{ + "unresolvedFunction": { + "functionName": "to_binary", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "g" + } + }, { + "literal": { + "string": "utf-8" + } + }] + } + }] + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_to_binary_with_format.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_to_binary_with_format.proto.bin new file mode 100644 index 00000000000..2f2364e5aba Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_to_binary_with_format.proto.bin differ diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_to_char.json b/connector/connect/common/src/test/resources/query-tests/queries/function_to_char.json new file mode 100644 index 00000000000..404a89a87ec --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_to_char.json @@ -0,0 +1,29 @@ +{ + "common": { + "planId": "1" + }, + "project": { + "input": { + "common": { + "planId": "0" + }, + "localRelation": { + "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e" + } + }, + "expressions": [{ + "unresolvedFunction": { + "functionName": "to_char", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "b" + } + }, { + "literal": { + "string": "$99.99" + } + }] + } + }] + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_to_char.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_to_char.proto.bin new file mode 100644 index 00000000000..087e212c39f Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_to_char.proto.bin differ diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_to_number.json b/connector/connect/common/src/test/resources/query-tests/queries/function_to_number.json new file mode 100644 index 00000000000..a39682de10f --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_to_number.json @@ -0,0 +1,29 @@ +{ + "common": { + "planId": "1" + }, + "project": { + "input": { + "common": { + "planId": "0" + }, + "localRelation": { + "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e" + } + }, + "expressions": [{ + "unresolvedFunction": { + "functionName": "to_char", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "g" + } + }, { + "literal": { + "string": "$99.99" + } + }] + } + }] + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_to_number.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_to_number.proto.bin new file mode 100644 index 00000000000..86ab9d23572 Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_to_number.proto.bin differ diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_to_timestamp_ltz.json b/connector/connect/common/src/test/resources/query-tests/queries/function_to_timestamp_ltz.json new file mode 100644 index 00000000000..59a79f39eb6 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_to_timestamp_ltz.json @@ -0,0 +1,25 @@ +{ + "common": { + "planId": "1" + }, + "project": { + "input": { + "common": { + "planId": "0" + }, + "localRelation": { + "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e" + } + }, + "expressions": [{ + "unresolvedFunction": { + "functionName": "to_timestamp_ltz", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "g" + } + }] + } + }] + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_to_timestamp_ltz.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_to_timestamp_ltz.proto.bin new file mode 100644 index 00000000000..9cabae3e756 Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_to_timestamp_ltz.proto.bin differ diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_to_timestamp_ltz_with_format.json b/connector/connect/common/src/test/resources/query-tests/queries/function_to_timestamp_ltz_with_format.json new file mode 100644 index 00000000000..08cb9c153f7 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_to_timestamp_ltz_with_format.json @@ -0,0 +1,29 @@ +{ + "common": { + "planId": "1" + }, + "project": { + "input": { + "common": { + "planId": "0" + }, + "localRelation": { + "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e" + } + }, + "expressions": [{ + "unresolvedFunction": { + "functionName": "to_timestamp_ltz", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "g" + } + }, { + "unresolvedAttribute": { + "unparsedIdentifier": "g" + } + }] + } + }] + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_to_timestamp_ltz_with_format.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_to_timestamp_ltz_with_format.proto.bin new file mode 100644 index 00000000000..22fd3d07dfc Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_to_timestamp_ltz_with_format.proto.bin differ diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_to_timestamp_ntz.json b/connector/connect/common/src/test/resources/query-tests/queries/function_to_timestamp_ntz.json new file mode 100644 index 00000000000..6808047ef20 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_to_timestamp_ntz.json @@ -0,0 +1,25 @@ +{ + "common": { + "planId": "1" + }, + "project": { + "input": { + "common": { + "planId": "0" + }, + "localRelation": { + "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e" + } + }, + "expressions": [{ + "unresolvedFunction": { + "functionName": "to_timestamp_ntz", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "g" + } + }] + } + }] + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_to_timestamp_ntz.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_to_timestamp_ntz.proto.bin new file mode 100644 index 00000000000..5cd4cfddbd1 Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_to_timestamp_ntz.proto.bin differ diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_to_timestamp_ntz_with_format.json b/connector/connect/common/src/test/resources/query-tests/queries/function_to_timestamp_ntz_with_format.json new file mode 100644 index 00000000000..03e38801bfa --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_to_timestamp_ntz_with_format.json @@ -0,0 +1,29 @@ +{ + "common": { + "planId": "1" + }, + "project": { + "input": { + "common": { + "planId": "0" + }, + "localRelation": { + "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e" + } + }, + "expressions": [{ + "unresolvedFunction": { + "functionName": "to_timestamp_ntz", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "g" + } + }, { + "unresolvedAttribute": { + "unparsedIdentifier": "g" + } + }] + } + }] + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_to_timestamp_ntz_with_format.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_to_timestamp_ntz_with_format.proto.bin new file mode 100644 index 00000000000..3a5d3dd9702 Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_to_timestamp_ntz_with_format.proto.bin differ diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_to_unix_timestamp.json b/connector/connect/common/src/test/resources/query-tests/queries/function_to_unix_timestamp.json new file mode 100644 index 00000000000..15a42b814a6 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_to_unix_timestamp.json @@ -0,0 +1,25 @@ +{ + "common": { + "planId": "1" + }, + "project": { + "input": { + "common": { + "planId": "0" + }, + "localRelation": { + "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e" + } + }, + "expressions": [{ + "unresolvedFunction": { + "functionName": "to_unix_timestamp", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "g" + } + }] + } + }] + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_to_unix_timestamp.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_to_unix_timestamp.proto.bin new file mode 100644 index 00000000000..1c70f303e6f Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_to_unix_timestamp.proto.bin differ diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_to_unix_timestamp_with_format.json b/connector/connect/common/src/test/resources/query-tests/queries/function_to_unix_timestamp_with_format.json new file mode 100644 index 00000000000..d6f4280d446 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_to_unix_timestamp_with_format.json @@ -0,0 +1,29 @@ +{ + "common": { + "planId": "1" + }, + "project": { + "input": { + "common": { + "planId": "0" + }, + "localRelation": { + "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e" + } + }, + "expressions": [{ + "unresolvedFunction": { + "functionName": "to_unix_timestamp", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "g" + } + }, { + "unresolvedAttribute": { + "unparsedIdentifier": "g" + } + }] + } + }] + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_to_unix_timestamp_with_format.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_to_unix_timestamp_with_format.proto.bin new file mode 100644 index 00000000000..141ff1fa320 Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_to_unix_timestamp_with_format.proto.bin differ diff --git a/python/docs/source/reference/pyspark.sql/functions.rst b/python/docs/source/reference/pyspark.sql/functions.rst index 3aa77971aa1..186ee0dce8d 100644 --- a/python/docs/source/reference/pyspark.sql/functions.rst +++ b/python/docs/source/reference/pyspark.sql/functions.rst @@ -140,7 +140,10 @@ Datetime Functions make_date from_unixtime unix_timestamp + to_unix_timestamp to_timestamp + to_timestamp_ltz + to_timestamp_ntz to_date trunc from_utc_timestamp @@ -221,6 +224,7 @@ Collection Functions map_concat from_csv schema_of_csv + str_to_map to_csv @@ -354,6 +358,9 @@ String Functions substring_index overlay sentences + to_binary + to_char + to_number translate trim upper diff --git a/python/pyspark/sql/connect/functions.py b/python/pyspark/sql/connect/functions.py index 85863f2e115..442c00efb6b 100644 --- a/python/pyspark/sql/connect/functions.py +++ b/python/pyspark/sql/connect/functions.py @@ -1688,6 +1688,20 @@ def map_zip_with( map_zip_with.__doc__ = pysparkfuncs.map_zip_with.__doc__ +def str_to_map( + text: "ColumnOrName", + pairDelim: Optional["ColumnOrName"] = None, + keyValueDelim: Optional["ColumnOrName"] = None, +) -> Column: + _pairDelim = lit(",") if pairDelim is None else _to_col(pairDelim) + _keyValueDelim = lit(":") if keyValueDelim is None else _to_col(keyValueDelim) + + return _invoke_function("str_to_map", _to_col(text), _pairDelim, _keyValueDelim) + + +str_to_map.__doc__ = pysparkfuncs.str_to_map.__doc__ + + def posexplode(col: "ColumnOrName") -> Column: return _invoke_function_over_columns("posexplode", col) @@ -2145,6 +2159,30 @@ def translate(srcCol: "ColumnOrName", matching: str, replace: str) -> Column: translate.__doc__ = pysparkfuncs.translate.__doc__ +def to_binary(col: "ColumnOrName", format: Optional["ColumnOrName"] = None) -> Column: + if format is not None: + return _invoke_function_over_columns("to_binary", col, format) + else: + return _invoke_function_over_columns("to_binary", col) + + +to_binary.__doc__ = pysparkfuncs.to_binary.__doc__ + + +def to_char(col: "ColumnOrName", format: "ColumnOrName") -> Column: + return _invoke_function_over_columns("to_char", col, format) + + +to_char.__doc__ = pysparkfuncs.to_char.__doc__ + + +def to_number(col: "ColumnOrName", format: "ColumnOrName") -> Column: + return _invoke_function_over_columns("to_number", col, format) + + +to_number.__doc__ = pysparkfuncs.to_number.__doc__ + + # Date/Timestamp functions # TODO(SPARK-41455): Resolve dtypes inconsistencies for: # to_timestamp, from_utc_timestamp, to_utc_timestamp, @@ -2570,6 +2608,45 @@ def session_window(timeColumn: "ColumnOrName", gapDuration: Union[Column, str]) session_window.__doc__ = pysparkfuncs.session_window.__doc__ +def to_unix_timestamp( + timestamp: "ColumnOrName", + format: Optional["ColumnOrName"] = None, +) -> Column: + if format is not None: + return _invoke_function_over_columns("to_unix_timestamp", timestamp, format) + else: + return _invoke_function_over_columns("to_unix_timestamp", timestamp) + + +to_unix_timestamp.__doc__ = pysparkfuncs.to_unix_timestamp.__doc__ + + +def to_timestamp_ltz( + timestamp: "ColumnOrName", + format: Optional["ColumnOrName"] = None, +) -> Column: + if format is not None: + return _invoke_function_over_columns("to_timestamp_ltz", timestamp, format) + else: + return _invoke_function_over_columns("to_timestamp_ltz", timestamp) + + +to_timestamp_ltz.__doc__ = pysparkfuncs.to_timestamp_ltz.__doc__ + + +def to_timestamp_ntz( + timestamp: "ColumnOrName", + format: Optional["ColumnOrName"] = None, +) -> Column: + if format is not None: + return _invoke_function_over_columns("to_timestamp_ntz", timestamp, format) + else: + return _invoke_function_over_columns("to_timestamp_ntz", timestamp) + + +to_timestamp_ntz.__doc__ = pysparkfuncs.to_timestamp_ntz.__doc__ + + # Partition Transformation Functions diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 42b9eaf137e..6b03025b614 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -6379,6 +6379,116 @@ def session_window(timeColumn: "ColumnOrName", gapDuration: Union[Column, str]) return _invoke_function("session_window", time_col, gap_duration) +def to_unix_timestamp( + timestamp: "ColumnOrName", + format: Optional["ColumnOrName"] = None, +) -> Column: + """ + Returns the UNIX timestamp of the given time. + + .. versionadded:: 3.5.0 + + Parameters + ---------- + timestamp : :class:`~pyspark.sql.Column` or str + Input column or strings. + format : :class:`~pyspark.sql.Column` or str, optional + format to use to convert UNIX timestamp values. + + Examples + -------- + >>> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles") + >>> df = spark.createDataFrame([("2016-04-08",)], ["e"]) + >>> df.select(to_unix_timestamp(df.e, lit("yyyy-MM-dd")).alias('r')).collect() + [Row(r=1460098800)] + >>> spark.conf.unset("spark.sql.session.timeZone") + + >>> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles") + >>> df = spark.createDataFrame([("2016-04-08",)], ["e"]) + >>> df.select(to_unix_timestamp(df.e).alias('r')).collect() + [Row(r=None)] + >>> spark.conf.unset("spark.sql.session.timeZone") + """ + if format is not None: + return _invoke_function_over_columns("to_unix_timestamp", timestamp, format) + else: + return _invoke_function_over_columns("to_unix_timestamp", timestamp) + + +def to_timestamp_ltz( + timestamp: "ColumnOrName", + format: Optional["ColumnOrName"] = None, +) -> Column: + """ + Parses the `timestamp` with the `format` to a timestamp without time zone. + Returns null with invalid input. + + .. versionadded:: 3.5.0 + + Parameters + ---------- + timestamp : :class:`~pyspark.sql.Column` or str + Input column or strings. + format : :class:`~pyspark.sql.Column` or str, optional + format to use to convert type `TimestampType` timestamp values. + + Examples + -------- + >>> spark.conf.set("spark.sql.session.timeZone", "UTC") + >>> df = spark.createDataFrame([("2016-12-31",)], ["e"]) + >>> df.select(to_timestamp_ltz(df.e, lit("yyyy-MM-dd")).alias('r')).collect() + [Row(r=datetime.datetime(2016, 12, 31, 0, 0))] + >>> spark.conf.unset("spark.sql.session.timeZone") + + >>> spark.conf.set("spark.sql.session.timeZone", "UTC") + >>> df = spark.createDataFrame([("2016-12-31",)], ["e"]) + >>> df.select(to_timestamp_ltz(df.e).alias('r')).collect() + [Row(r=datetime.datetime(2016, 12, 31, 0, 0))] + >>> spark.conf.unset("spark.sql.session.timeZone") + """ + if format is not None: + return _invoke_function_over_columns("to_timestamp_ltz", timestamp, format) + else: + return _invoke_function_over_columns("to_timestamp_ltz", timestamp) + + +def to_timestamp_ntz( + timestamp: "ColumnOrName", + format: Optional["ColumnOrName"] = None, +) -> Column: + """ + Parses the `timestamp` with the `format` to a timestamp without time zone. + Returns null with invalid input. + + .. versionadded:: 3.5.0 + + Parameters + ---------- + timestamp : :class:`~pyspark.sql.Column` or str + Input column or strings. + format : :class:`~pyspark.sql.Column` or str, optional + format to use to convert type `TimestampNTZType` timestamp values. + + Examples + -------- + >>> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles") + >>> df = spark.createDataFrame([("2016-04-08",)], ["e"]) + >>> df.select(to_timestamp_ntz(df.e, lit("yyyy-MM-dd")).alias('r')).collect() + [Row(r=datetime.datetime(2016, 4, 8, 0, 0))] + >>> spark.conf.unset("spark.sql.session.timeZone") + + >>> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles") + >>> df = spark.createDataFrame([("2016-04-08",)], ["e"]) + >>> df.select(to_timestamp_ntz(df.e).alias('r')).collect() + [Row(r=datetime.datetime(2016, 4, 8, 0, 0))] + >>> spark.conf.unset("spark.sql.session.timeZone") + """ + if format is not None: + return _invoke_function_over_columns("to_timestamp_ntz", timestamp, format) + else: + return _invoke_function_over_columns("to_timestamp_ntz", timestamp) + + # ---------------------------- misc functions ---------------------------------- @@ -7936,6 +8046,119 @@ def translate(srcCol: "ColumnOrName", matching: str, replace: str) -> Column: return _invoke_function("translate", _to_java_column(srcCol), matching, replace) +@try_remote_functions +def to_binary(col: "ColumnOrName", format: Optional["ColumnOrName"] = None) -> Column: + """ + Converts the input `col` to a binary value based on the supplied `format`. + The `format` can be a case-insensitive string literal of "hex", "utf-8", "utf8", + or "base64". By default, the binary format for conversion is "hex" if + `format` is omitted. The function returns NULL if at least one of the + input parameters is NULL. + + .. versionadded:: 3.5.0 + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + Input column or strings. + format : :class:`~pyspark.sql.Column` or str, optional + format to use to convert binary values. + + Examples + -------- + >>> df = spark.createDataFrame([("abc",)], ["e"]) + >>> df.select(to_binary(df.e, lit("utf-8")).alias('r')).collect() + [Row(r=bytearray(b'abc'))] + + >>> df = spark.createDataFrame([("414243",)], ["e"]) + >>> df.select(to_binary(df.e).alias('r')).collect() + [Row(r=bytearray(b'ABC'))] + """ + if format is not None: + return _invoke_function_over_columns("to_binary", col, format) + else: + return _invoke_function_over_columns("to_binary", col) + + +def to_char(col: "ColumnOrName", format: "ColumnOrName") -> Column: + """ + Convert `col` to a string based on the `format`. + Throws an exception if the conversion fails. The format can consist of the following + characters, case insensitive: + '0' or '9': Specifies an expected digit between 0 and 9. A sequence of 0 or 9 in the + format string matches a sequence of digits in the input value, generating a result + string of the same length as the corresponding sequence in the format string. + The result string is left-padded with zeros if the 0/9 sequence comprises more digits + than the matching part of the decimal value, starts with 0, and is before the decimal + point. Otherwise, it is padded with spaces. + '.' or 'D': Specifies the position of the decimal point (optional, only allowed once). + ',' or 'G': Specifies the position of the grouping (thousands) separator (,). + There must be a 0 or 9 to the left and right of each grouping separator. + '$': Specifies the location of the $ currency sign. This character may only be specified once. + 'S' or 'MI': Specifies the position of a '-' or '+' sign (optional, only allowed once at + the beginning or end of the format string). Note that 'S' prints '+' for positive + values but 'MI' prints a space. + 'PR': Only allowed at the end of the format string; specifies that the result string + will be wrapped by angle brackets if the input value is negative. + + .. versionadded:: 3.5.0 + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + Input column or strings. + format : :class:`~pyspark.sql.Column` or str, optional + format to use to convert char values. + + Examples + -------- + >>> df = spark.createDataFrame([(78.12,)], ["e"]) + >>> df.select(to_char(df.e, lit("$99.99")).alias('r')).collect() + [Row(r='$78.12')] + """ + return _invoke_function_over_columns("to_char", col, format) + + +def to_number(col: "ColumnOrName", format: "ColumnOrName") -> Column: + """ + Convert string 'col' to a number based on the string format 'format'. + Throws an exception if the conversion fails. The format can consist of the following + characters, case insensitive: + '0' or '9': Specifies an expected digit between 0 and 9. A sequence of 0 or 9 in the + format string matches a sequence of digits in the input string. If the 0/9 + sequence starts with 0 and is before the decimal point, it can only match a digit + sequence of the same size. Otherwise, if the sequence starts with 9 or is after + the decimal point, it can match a digit sequence that has the same or smaller size. + '.' or 'D': Specifies the position of the decimal point (optional, only allowed once). + ',' or 'G': Specifies the position of the grouping (thousands) separator (,). + There must be a 0 or 9 to the left and right of each grouping separator. + 'col' must match the grouping separator relevant for the size of the number. + '$': Specifies the location of the $ currency sign. This character may only be + specified once. + 'S' or 'MI': Specifies the position of a '-' or '+' sign (optional, only allowed + once at the beginning or end of the format string). Note that 'S' allows '-' + but 'MI' does not. + 'PR': Only allowed at the end of the format string; specifies that 'col' indicates a + negative number with wrapping angled brackets. + + .. versionadded:: 3.5.0 + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + Input column or strings. + format : :class:`~pyspark.sql.Column` or str, optional + format to use to convert number values. + + Examples + -------- + >>> df = spark.createDataFrame([("$78.12",)], ["e"]) + >>> df.select(to_number(df.e, lit("$99.99")).alias('r')).collect() + [Row(r=Decimal('78.12'))] + """ + return _invoke_function_over_columns("to_number", col, format) + + # ---------------------- Collection functions ------------------------------ @@ -10636,6 +10859,47 @@ def map_zip_with( return _invoke_higher_order_function("MapZipWith", [col1, col2], [f]) +def str_to_map( + text: "ColumnOrName", + pairDelim: Optional["ColumnOrName"] = None, + keyValueDelim: Optional["ColumnOrName"] = None, +) -> Column: + """ + Creates a map after splitting the text into key/value pairs using delimiters. + Both `pairDelim` and `keyValueDelim` are treated as regular expressions. + + .. versionadded:: 3.5.0 + + Parameters + ---------- + text : :class:`~pyspark.sql.Column` or str + Input column or strings. + pairDelim : :class:`~pyspark.sql.Column` or str, optional + delimiter to use to split pair. + keyValueDelim : :class:`~pyspark.sql.Column` or str, optional + delimiter to use to split key/value. + + Examples + -------- + >>> df = spark.createDataFrame([("a:1,b:2,c:3",)], ["e"]) + >>> df.select(str_to_map(df.e, lit(","), lit(":")).alias('r')).collect() + [Row(r={'a': '1', 'b': '2', 'c': '3'})] + + >>> df = spark.createDataFrame([("a:1,b:2,c:3",)], ["e"]) + >>> df.select(str_to_map(df.e, lit(",")).alias('r')).collect() + [Row(r={'a': '1', 'b': '2', 'c': '3'})] + + >>> df = spark.createDataFrame([("a:1,b:2,c:3",)], ["e"]) + >>> df.select(str_to_map(df.e).alias('r')).collect() + [Row(r={'a': '1', 'b': '2', 'c': '3'})] + """ + if pairDelim is None: + pairDelim = lit(",") + if keyValueDelim is None: + keyValueDelim = lit(":") + return _invoke_function_over_columns("str_to_map", text, pairDelim, keyValueDelim) + + # ---------------------- Partition transform functions -------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index ab14d4eb955..4e698d362d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -1461,6 +1461,38 @@ object functions { MapFromArrays(keys.expr, values.expr) } + /** + * Creates a map after splitting the text into key/value pairs using delimiters. + * Both `pairDelim` and `keyValueDelim` are treated as regular expressions. + * + * @group map_funcs + * @since 3.5.0 + */ + def str_to_map(text: Column, pairDelim: Column, keyValueDelim: Column): Column = withExpr { + StringToMap(text.expr, pairDelim.expr, keyValueDelim.expr) + } + + /** + * Creates a map after splitting the text into key/value pairs using delimiters. + * The `pairDelim` is treated as regular expressions. + * + * @group map_funcs + * @since 3.5.0 + */ + def str_to_map(text: Column, pairDelim: Column): Column = withExpr { + new StringToMap(text.expr, pairDelim.expr) + } + + /** + * Creates a map after splitting the text into key/value pairs using delimiters. + * + * @group map_funcs + * @since 3.5.0 + */ + def str_to_map(text: Column): Column = withExpr { + new StringToMap(text.expr) + } + /** * Marks a DataFrame as small enough for use in broadcast joins. * @@ -3452,6 +3484,85 @@ object functions { */ def upper(e: Column): Column = withExpr { Upper(e.expr) } + /** + * Converts the input `e` to a binary value based on the supplied `format`. + * The `format` can be a case-insensitive string literal of "hex", "utf-8", "utf8", or "base64". + * By default, the binary format for conversion is "hex" if `format` is omitted. + * The function returns NULL if at least one of the input parameters is NULL. + * + * @group string_funcs + * @since 3.5.0 + */ + def to_binary(e: Column, format: Column): Column = withExpr { + new ToBinary(e.expr, format.expr) + } + + /** + * Converts the input `e` to a binary value based on the default format "hex". + * The function returns NULL if at least one of the input parameters is NULL. + * + * @group string_funcs + * @since 3.5.0 + */ + def to_binary(e: Column): Column = withExpr { + new ToBinary(e.expr) + } + + /** + * Convert `e` to a string based on the `format`. + * Throws an exception if the conversion fails. The format can consist of the following + * characters, case insensitive: + * '0' or '9': Specifies an expected digit between 0 and 9. A sequence of 0 or 9 in the format + * string matches a sequence of digits in the input value, generating a result string of the + * same length as the corresponding sequence in the format string. The result string is + * left-padded with zeros if the 0/9 sequence comprises more digits than the matching part of + * the decimal value, starts with 0, and is before the decimal point. Otherwise, it is + * padded with spaces. + * '.' or 'D': Specifies the position of the decimal point (optional, only allowed once). + * ',' or 'G': Specifies the position of the grouping (thousands) separator (,). There must be + * a 0 or 9 to the left and right of each grouping separator. + * '$': Specifies the location of the $ currency sign. This character may only be specified + * once. + * 'S' or 'MI': Specifies the position of a '-' or '+' sign (optional, only allowed once at + * the beginning or end of the format string). Note that 'S' prints '+' for positive values + * but 'MI' prints a space. + * 'PR': Only allowed at the end of the format string; specifies that the result string will be + * wrapped by angle brackets if the input value is negative. + * + * @group string_funcs + * @since 3.5.0 + */ + def to_char(e: Column, format: Column): Column = withExpr { + ToCharacter(e.expr, format.expr) + } + + /** + * Convert string 'e' to a number based on the string format 'format'. + * Throws an exception if the conversion fails. The format can consist of the following + * characters, case insensitive: + * '0' or '9': Specifies an expected digit between 0 and 9. A sequence of 0 or 9 in the format + * string matches a sequence of digits in the input string. If the 0/9 sequence starts with + * 0 and is before the decimal point, it can only match a digit sequence of the same size. + * Otherwise, if the sequence starts with 9 or is after the decimal point, it can match a + * digit sequence that has the same or smaller size. + * '.' or 'D': Specifies the position of the decimal point (optional, only allowed once). + * ',' or 'G': Specifies the position of the grouping (thousands) separator (,). There must be + * a 0 or 9 to the left and right of each grouping separator. 'expr' must match the + * grouping separator relevant for the size of the number. + * '$': Specifies the location of the $ currency sign. This character may only be specified + * once. + * 'S' or 'MI': Specifies the position of a '-' or '+' sign (optional, only allowed once at + * the beginning or end of the format string). Note that 'S' allows '-' but 'MI' does not. + * 'PR': Only allowed at the end of the format string; specifies that 'expr' indicates a + * negative number with wrapping angled brackets. + * + * @group string_funcs + * @since 3.5.0 + */ + def to_number(e: Column, format: Column): Column = withExpr { + ToNumber(e.expr, format.expr) + } + ////////////////////////////////////////////////////////////////////////////////////////////// // DateTime functions ////////////////////////////////////////////////////////////////////////////////////////////// @@ -4302,6 +4413,70 @@ object functions { SecondsToTimestamp(e.expr) } + /** + * Parses the `timestamp` expression with the `format` expression + * to a timestamp without time zone. Returns null with invalid input. + * + * @group datetime_funcs + * @since 3.5.0 + */ + def to_timestamp_ltz(timestamp: Column, format: Column): Column = withExpr { + ParseToTimestamp(timestamp.expr, Some(format.expr), TimestampType) + } + + /** + * Parses the `timestamp` expression with the default format to a timestamp without time zone. + * The default format follows casting rules to a timestamp. Returns null with invalid input. + * + * @group datetime_funcs + * @since 3.5.0 + */ + def to_timestamp_ltz(timestamp: Column): Column = withExpr { + ParseToTimestamp(timestamp.expr, None, TimestampType) + } + + /** + * Parses the `timestamp_str` expression with the `format` expression + * to a timestamp without time zone. Returns null with invalid input. + * + * @group datetime_funcs + * @since 3.5.0 + */ + def to_timestamp_ntz(timestamp: Column, format: Column): Column = withExpr { + ParseToTimestamp(timestamp.expr, Some(format.expr), TimestampNTZType) + } + + /** + * Parses the `timestamp` expression with the default format to a timestamp without time zone. + * The default format follows casting rules to a timestamp. Returns null with invalid input. + * + * @group datetime_funcs + * @since 3.5.0 + */ + def to_timestamp_ntz(timestamp: Column): Column = withExpr { + ParseToTimestamp(timestamp.expr, None, TimestampNTZType) + } + + /** + * Returns the UNIX timestamp of the given time. + * + * @group datetime_funcs + * @since 3.5.0 + */ + def to_unix_timestamp(e: Column, format: Column): Column = withExpr { + new ToUnixTimestamp(e.expr, format.expr) + } + + /** + * Returns the UNIX timestamp of the given time. + * + * @group datetime_funcs + * @since 3.5.0 + */ + def to_unix_timestamp(e: Column): Column = withExpr { + new ToUnixTimestamp(e.expr) + } + ////////////////////////////////////////////////////////////////////////////////////////////// // Collection functions ////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala index 54717ff05b4..b0462042421 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala @@ -796,12 +796,23 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { val df = Seq((date1, ts1, s1, ss1), (date2, ts2, s2, ss2)).toDF("d", "ts", "s", "ss") checkAnswer(df.selectExpr("to_unix_timestamp(ts)"), Seq( Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) + checkAnswer(df.select(to_unix_timestamp(col("ts"))), Seq( + Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) + checkAnswer(df.selectExpr("to_unix_timestamp(ss)"), Seq( Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) + checkAnswer(df.select(to_unix_timestamp(col("ss"))), Seq( + Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) + checkAnswer(df.selectExpr(s"to_unix_timestamp(d, '$fmt')"), Seq( Row(secs(date1.getTime)), Row(secs(date2.getTime)))) + checkAnswer(df.select(to_unix_timestamp(col("d"), lit("$fmt"))), Seq( + Row(secs(date1.getTime)), Row(secs(date2.getTime)))) + checkAnswer(df.selectExpr(s"to_unix_timestamp(s, '$fmt')"), Seq( Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) + checkAnswer(df.select(to_unix_timestamp(col("s"), lit(fmt))), Seq( + Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) val x1 = "2015-07-24 10:00:00" val x2 = "2015-25-07 02:02:02" @@ -828,6 +839,10 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { val invalid = df1.selectExpr(s"to_unix_timestamp(x, 'yyyy-MM-dd bb:HH:ss')") val e = intercept[IllegalArgumentException](invalid.collect()) assert(e.getMessage.contains('b')) + + val df3 = Seq("2016-04-08").toDF("a") + checkAnswer(df3.selectExpr("unix_timestamp(a)"), Seq(Row(null))) + checkAnswer(df3.select(unix_timestamp(col("a"))), Seq(Row(null))) } } } @@ -1039,4 +1054,24 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { checkTrunc("SECOND", "1961-04-12 00:01:02") checkTrunc("MINUTE", "1961-04-12 00:01:00") } + + test("to_timestamp_ltz") { + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") { + val df = Seq("2012-11-30").toDF("d") + checkAnswer( + df.selectExpr("to_timestamp_ltz(d, 'yyyy-MM-dd')"), + df.select(to_timestamp_ltz(col("d"), lit("yyyy-MM-dd"))) + ) + } + } + + test("to_timestamp_ntz") { + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") { + val df = Seq("1990-11-22").toDF("d") + checkAnswer( + df.selectExpr("to_timestamp_ntz(d, 'yyyy-MM-dd')"), + df.select(to_timestamp_ntz(col("d"), lit("yyyy-MM-dd"))) + ) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala index f612c5903dc..b44b13d4cfa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala @@ -619,6 +619,13 @@ class StringFunctionsSuite extends QueryTest with SharedSparkSession { Row(Map("a" -> "1", "b" -> "2", "c" -> "3")) ) ) + checkAnswer( + df1.select(str_to_map(col("a"), lit(","), lit("="))), + Seq( + Row(Map("a" -> "1", "b" -> "2")), + Row(Map("a" -> "1", "b" -> "2", "c" -> "3")) + ) + ) val df2 = Seq(("a:1,b:2,c:3", "y")).toDF("a", "b") @@ -626,11 +633,19 @@ class StringFunctionsSuite extends QueryTest with SharedSparkSession { df2.selectExpr("str_to_map(a)"), Seq(Row(Map("a" -> "1", "b" -> "2", "c" -> "3"))) ) + checkAnswer( + df2.select(str_to_map(col("a"))), + Seq(Row(Map("a" -> "1", "b" -> "2", "c" -> "3"))) + ) checkAnswer( df2.selectExpr("str_to_map(a, ',')"), Seq(Row(Map("a" -> "1", "b" -> "2", "c" -> "3"))) ) + checkAnswer( + df2.select(str_to_map(col("a"), lit(","))), + Seq(Row(Map("a" -> "1", "b" -> "2", "c" -> "3"))) + ) val df3 = Seq( ("a=1&b=2", "&", "="), @@ -644,6 +659,13 @@ class StringFunctionsSuite extends QueryTest with SharedSparkSession { Row(Map("k" -> "2", "v" -> "3")) ) ) + checkAnswer( + df3.select(str_to_map(col("str"), col("delim1"), col("delim2"))), + Seq( + Row(Map("a" -> "1", "b" -> "2")), + Row(Map("k" -> "2", "v" -> "3")) + ) + ) val df4 = Seq( ("a:1&b:2", "&"), @@ -657,6 +679,13 @@ class StringFunctionsSuite extends QueryTest with SharedSparkSession { Row(Map("k" -> "2", "v" -> "3")) ) ) + checkAnswer( + df4.select(str_to_map(col("str"), col("delim1"))), + Seq( + Row(Map("a" -> "1", "b" -> "2")), + Row(Map("k" -> "2", "v" -> "3")) + ) + ) } test("SPARK-36148: check input data types of regexp_replace") { @@ -726,4 +755,36 @@ class StringFunctionsSuite extends QueryTest with SharedSparkSession { Row("QqQQdddoooo") :: Row(null) :: Nil ) } + + test("to_binary") { + val df = Seq("abc").toDF("a") + checkAnswer( + df.selectExpr("to_binary(a, 'utf-8')"), + df.select(to_binary(col("a"), lit("utf-8"))) + ) + } + + test("to_char") { + val df = Seq(78.12).toDF("a") + checkAnswer( + df.selectExpr("to_char(a, '$99.99')"), + Seq(Row("$78.12")) + ) + checkAnswer( + df.select(to_char(col("a"), lit("$99.99"))), + Seq(Row("$78.12")) + ) + } + + test("to_number") { + val df = Seq("$78.12").toDF("a") + checkAnswer( + df.selectExpr("to_number(a, '$99.99')"), + Seq(Row(78.12)) + ) + checkAnswer( + df.select(to_number(col("a"), lit("$99.99"))), + Seq(Row(78.12)) + ) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org