This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 68b30053f78 [SPARK-43939][CONNECT][PYTHON] Add try_* functions to Scala and Python 68b30053f78 is described below commit 68b30053f786e8178e6bdba736734e91adb51088 Author: panbingkun <pbk1...@gmail.com> AuthorDate: Wed Jun 21 00:38:22 2023 +0800 [SPARK-43939][CONNECT][PYTHON] Add try_* functions to Scala and Python ### What changes were proposed in this pull request? Add following functions: - try_add - try_avg - try_divide - try_element_at - try_multiply - try_subtract - try_sum - try_to_binary - try_to_number - try_to_timestamp to: - Scala API - Python API - Spark Connect Scala Client - Spark Connect Python Client ### Why are the changes needed? for parity ### Does this PR introduce _any_ user-facing change? Yes, new functions. ### How was this patch tested? - Add New UT. Closes #41653 from panbingkun/SPARK-43939. Authored-by: panbingkun <pbk1...@gmail.com> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- .../scala/org/apache/spark/sql/functions.scala | 115 +++++++ .../apache/spark/sql/PlanGenerationTestSuite.scala | 52 ++++ .../explain-results/function_try_add.explain | 2 + .../explain-results/function_try_avg.explain | 2 + .../explain-results/function_try_divide.explain | 2 + .../function_try_element_at_array.explain | 2 + .../function_try_element_at_map.explain | 2 + .../explain-results/function_try_multiply.explain | 2 + .../explain-results/function_try_subtract.explain | 2 + .../explain-results/function_try_sum.explain | 2 + .../explain-results/function_try_to_binary.explain | 2 + .../function_try_to_binary_without_format.explain | 2 + .../explain-results/function_try_to_number.explain | 2 + .../function_try_to_timestamp.explain | 2 + ...unction_try_to_timestamp_without_format.explain | 2 + .../query-tests/queries/function_try_add.json | 29 ++ .../query-tests/queries/function_try_add.proto.bin | Bin 0 -> 183 bytes .../query-tests/queries/function_try_avg.json | 25 ++ .../query-tests/queries/function_try_avg.proto.bin | Bin 0 -> 176 bytes .../query-tests/queries/function_try_divide.json | 29 ++ .../queries/function_try_divide.proto.bin | Bin 0 -> 186 bytes .../queries/function_try_element_at_array.json | 29 ++ .../function_try_element_at_array.proto.bin | Bin 0 -> 190 bytes .../queries/function_try_element_at_map.json | 29 ++ .../queries/function_try_element_at_map.proto.bin | Bin 0 -> 190 bytes .../query-tests/queries/function_try_multiply.json | 29 ++ .../queries/function_try_multiply.proto.bin | Bin 0 -> 188 bytes .../query-tests/queries/function_try_subtract.json | 29 ++ .../queries/function_try_subtract.proto.bin | Bin 0 -> 188 bytes .../query-tests/queries/function_try_sum.json | 25 ++ .../query-tests/queries/function_try_sum.proto.bin | Bin 0 -> 176 bytes .../queries/function_try_to_binary.json | 29 ++ .../queries/function_try_to_binary.proto.bin | Bin 0 -> 194 bytes .../function_try_to_binary_without_format.json | 25 ++ ...function_try_to_binary_without_format.proto.bin | Bin 0 -> 182 bytes .../queries/function_try_to_number.json | 29 ++ .../queries/function_try_to_number.proto.bin | Bin 0 -> 194 bytes .../queries/function_try_to_timestamp.json | 29 ++ .../queries/function_try_to_timestamp.proto.bin | Bin 0 -> 192 bytes .../function_try_to_timestamp_without_format.json | 25 ++ ...ction_try_to_timestamp_without_format.proto.bin | Bin 0 -> 185 bytes .../source/reference/pyspark.sql/functions.rst | 10 + python/pyspark/sql/connect/functions.py | 76 +++++ python/pyspark/sql/functions.py | 341 +++++++++++++++++++++ .../scala/org/apache/spark/sql/functions.scala | 137 +++++++++ .../org/apache/spark/sql/DateFunctionsSuite.scala | 11 + .../org/apache/spark/sql/MathFunctionsSuite.scala | 91 ++++++ .../apache/spark/sql/StringFunctionsSuite.scala | 17 + 48 files changed, 1237 insertions(+) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala index a3f4a273661..d258abcecfa 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala @@ -1807,6 +1807,58 @@ object functions { */ def sqrt(colName: String): Column = sqrt(Column(colName)) + /** + * Returns the sum of `left` and `right` and the result is null on overflow. The acceptable + * input types are the same with the `+` operator. + * + * @group math_funcs + * @since 3.5.0 + */ + def try_add(left: Column, right: Column): Column = Column.fn("try_add", left, right) + + /** + * Returns the mean calculated from values of a group and the result is null on overflow. + * + * @group math_funcs + * @since 3.5.0 + */ + def try_avg(e: Column): Column = Column.fn("try_avg", e) + + /** + * Returns `dividend``/``divisor`. It always performs floating point division. Its result is + * always null if `divisor` is 0. + * + * @group math_funcs + * @since 3.5.0 + */ + def try_divide(left: Column, right: Column): Column = Column.fn("try_divide", left, right) + + /** + * Returns `left``*``right` and the result is null on overflow. The acceptable input types are + * the same with the `*` operator. + * + * @group math_funcs + * @since 3.5.0 + */ + def try_multiply(left: Column, right: Column): Column = Column.fn("try_multiply", left, right) + + /** + * Returns `left``-``right` and the result is null on overflow. The acceptable input types are + * the same with the `-` operator. + * + * @group math_funcs + * @since 3.5.0 + */ + def try_subtract(left: Column, right: Column): Column = Column.fn("try_subtract", left, right) + + /** + * Returns the sum calculated from values of a group and the result is null on overflow. + * + * @group math_funcs + * @since 3.5.0 + */ + def try_sum(e: Column): Column = Column.fn("try_sum", e) + /** * Creates a new struct column. If the input column is a column in a `DataFrame`, or a derived * column expression that is named (i.e. aliased), its name would be retained as the @@ -3990,6 +4042,34 @@ object functions { */ def btrim(str: Column, trim: Column): Column = Column.fn("btrim", str, trim) + /** + * This is a special version of `to_binary` that performs the same operation, but returns a NULL + * value instead of raising an error if the conversion cannot be performed. + * + * @group string_funcs + * @since 3.5.0 + */ + def try_to_binary(e: Column, f: Column): Column = Column.fn("try_to_binary", e, f) + + /** + * This is a special version of `to_binary` that performs the same operation, but returns a NULL + * value instead of raising an error if the conversion cannot be performed. + * + * @group string_funcs + * @since 3.5.0 + */ + def try_to_binary(e: Column): Column = Column.fn("try_to_binary", e) + + /** + * Convert string `e` to a number based on the string format `format`. Returns NULL if the + * string `e` does not match the expected format. The format follows the same semantics as the + * to_number function. + * + * @group string_funcs + * @since 3.5.0 + */ + def try_to_number(e: Column, format: Column): Column = Column.fn("try_to_number", e, format) + /** * Returns the character length of string data or number of bytes of binary data. The length of * string data includes the trailing spaces. The length of binary data includes binary zeros. @@ -4745,6 +4825,27 @@ object functions { */ def to_timestamp(s: Column, fmt: String): Column = Column.fn("to_timestamp", s, lit(fmt)) + /** + * Parses the `s` with the `format` to a timestamp. The function always returns null on an + * invalid input with`/`without ANSI SQL mode enabled. The result data type is consistent with + * the value of configuration `spark.sql.timestampType`. + * + * @group datetime_funcs + * @since 3.5.0 + */ + def try_to_timestamp(s: Column, format: Column): Column = + Column.fn("try_to_timestamp", s, format) + + /** + * Parses the `s` expression to a timestamp. The function always returns null on an invalid + * input with`/`without ANSI SQL mode enabled. It follows casting rules to a timestamp. The + * result data type is consistent with the value of configuration `spark.sql.timestampType`. + * + * @group datetime_funcs + * @since 3.5.0 + */ + def try_to_timestamp(s: Column): Column = Column.fn("try_to_timestamp", s) + /** * Converts the column into `DateType` by casting rules to `DateType`. * @@ -5321,6 +5422,20 @@ object functions { */ def element_at(column: Column, value: Any): Column = Column.fn("element_at", column, lit(value)) + /** + * (array, index) - Returns element of array at given (1-based) index. If Index is 0, Spark will + * throw an error. If index < 0, accesses elements from the last to the first. The function + * always returns NULL if the index exceeds the length of the array. + * + * (map, key) - Returns value for given key. The function always returns NULL if the key is not + * contained in the map. + * + * @group map_funcs + * @since 3.5.0 + */ + def try_element_at(column: Column, value: Column): Column = + Column.fn("try_element_at", column, value) + /** * Returns element of array at given (0-based) index. If the index points outside of the array * boundaries, then this function returns NULL. diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala index 2dae9a99146..53db026f340 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala @@ -1213,6 +1213,58 @@ class PlanGenerationTestSuite fn.sqrt("b") } + functionTest("try_add") { + fn.try_add(fn.col("a"), fn.col("a")) + } + + functionTest("try_avg") { + fn.try_avg(fn.col("a")) + } + + functionTest("try_divide") { + fn.try_divide(fn.col("a"), fn.col("a")) + } + + functionTest("try_multiply") { + fn.try_multiply(fn.col("a"), fn.col("a")) + } + + functionTest("try_subtract") { + fn.try_subtract(fn.col("a"), fn.col("a")) + } + + functionTest("try_sum") { + fn.try_sum(fn.col("a")) + } + + functionTest("try_to_timestamp") { + fn.try_to_timestamp(fn.col("g"), fn.col("g")) + } + + functionTest("try_to_timestamp without format") { + fn.try_to_timestamp(fn.col("g")) + } + + functionTest("try_to_binary") { + fn.try_to_binary(fn.col("g"), lit("format")) + } + + functionTest("try_to_binary without format") { + fn.try_to_binary(fn.col("g")) + } + + functionTest("try_to_number") { + fn.try_to_number(fn.col("g"), lit("99,999")) + } + + functionTest("try_element_at array") { + fn.try_element_at(fn.col("e"), fn.col("a")) + } + + functionTest("try_element_at map") { + fn.try_element_at(fn.col("f"), fn.col("g")) + } + functionTest("struct") { fn.struct("a", "d") } diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_add.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_add.explain new file mode 100644 index 00000000000..af718833dbf --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_add.explain @@ -0,0 +1,2 @@ +Project [(a#0 + a#0) AS try_add(a, a)#0] ++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_avg.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_avg.explain new file mode 100644 index 00000000000..84c7065fbed --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_avg.explain @@ -0,0 +1,2 @@ +Aggregate [try_avg(a#0) AS try_avg(a)#0] ++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_divide.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_divide.explain new file mode 100644 index 00000000000..03f59e9b957 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_divide.explain @@ -0,0 +1,2 @@ +Project [(cast(a#0 as double) / cast(a#0 as double)) AS try_divide(a, a)#0] ++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_element_at_array.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_element_at_array.explain new file mode 100644 index 00000000000..20e67549c6d --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_element_at_array.explain @@ -0,0 +1,2 @@ +Project [element_at(e#0, a#0, None, false) AS try_element_at(e, a)#0] ++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_element_at_map.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_element_at_map.explain new file mode 100644 index 00000000000..aa2b736ada3 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_element_at_map.explain @@ -0,0 +1,2 @@ +Project [element_at(f#0, g#0, None, false) AS try_element_at(f, g)#0] ++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_multiply.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_multiply.explain new file mode 100644 index 00000000000..855ecec2ca0 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_multiply.explain @@ -0,0 +1,2 @@ +Project [(a#0 * a#0) AS try_multiply(a, a)#0] ++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_subtract.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_subtract.explain new file mode 100644 index 00000000000..4422fd91be7 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_subtract.explain @@ -0,0 +1,2 @@ +Project [(a#0 - a#0) AS try_subtract(a, a)#0] ++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_sum.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_sum.explain new file mode 100644 index 00000000000..43d790d43b1 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_sum.explain @@ -0,0 +1,2 @@ +Aggregate [try_sum(a#0) AS try_sum(a)#0L] ++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_to_binary.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_to_binary.explain new file mode 100644 index 00000000000..ed7ed2348e2 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_to_binary.explain @@ -0,0 +1,2 @@ +Project [tryeval(null) AS try_to_binary(g, format)#0] ++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_to_binary_without_format.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_to_binary_without_format.explain new file mode 100644 index 00000000000..b06903b8e24 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_to_binary_without_format.explain @@ -0,0 +1,2 @@ +Project [tryeval(unhex(g#0, true)) AS try_to_binary(g)#0] ++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_to_number.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_to_number.explain new file mode 100644 index 00000000000..aabb9f60c47 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_to_number.explain @@ -0,0 +1,2 @@ +Project [try_to_number(g#0, 99,999) AS try_to_number(g, 99,999)#0] ++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_to_timestamp.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_to_timestamp.explain new file mode 100644 index 00000000000..8074beab7db --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_to_timestamp.explain @@ -0,0 +1,2 @@ +Project [gettimestamp(g#0, g#0, TimestampType, Some(America/Los_Angeles), false) AS try_to_timestamp(g, g)#0] ++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_to_timestamp_without_format.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_to_timestamp_without_format.explain new file mode 100644 index 00000000000..b1a43255217 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_try_to_timestamp_without_format.explain @@ -0,0 +1,2 @@ +Project [cast(g#0 as timestamp) AS try_to_timestamp(g)#0] ++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_try_add.json b/connector/connect/common/src/test/resources/query-tests/queries/function_try_add.json new file mode 100644 index 00000000000..80300b5b577 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_try_add.json @@ -0,0 +1,29 @@ +{ + "common": { + "planId": "1" + }, + "project": { + "input": { + "common": { + "planId": "0" + }, + "localRelation": { + "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e" + } + }, + "expressions": [{ + "unresolvedFunction": { + "functionName": "try_add", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "a" + } + }, { + "unresolvedAttribute": { + "unparsedIdentifier": "a" + } + }] + } + }] + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_try_add.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_try_add.proto.bin new file mode 100644 index 00000000000..c1cb613b394 Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_try_add.proto.bin differ diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_try_avg.json b/connector/connect/common/src/test/resources/query-tests/queries/function_try_avg.json new file mode 100644 index 00000000000..1216f4b5c63 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_try_avg.json @@ -0,0 +1,25 @@ +{ + "common": { + "planId": "1" + }, + "project": { + "input": { + "common": { + "planId": "0" + }, + "localRelation": { + "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e" + } + }, + "expressions": [{ + "unresolvedFunction": { + "functionName": "try_avg", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "a" + } + }] + } + }] + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_try_avg.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_try_avg.proto.bin new file mode 100644 index 00000000000..8ab7a5d19e3 Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_try_avg.proto.bin differ diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_try_divide.json b/connector/connect/common/src/test/resources/query-tests/queries/function_try_divide.json new file mode 100644 index 00000000000..d7d012756e6 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_try_divide.json @@ -0,0 +1,29 @@ +{ + "common": { + "planId": "1" + }, + "project": { + "input": { + "common": { + "planId": "0" + }, + "localRelation": { + "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e" + } + }, + "expressions": [{ + "unresolvedFunction": { + "functionName": "try_divide", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "a" + } + }, { + "unresolvedAttribute": { + "unparsedIdentifier": "a" + } + }] + } + }] + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_try_divide.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_try_divide.proto.bin new file mode 100644 index 00000000000..05c8d4a193a Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_try_divide.proto.bin differ diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_try_element_at_array.json b/connector/connect/common/src/test/resources/query-tests/queries/function_try_element_at_array.json new file mode 100644 index 00000000000..c2651e4ad72 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_try_element_at_array.json @@ -0,0 +1,29 @@ +{ + "common": { + "planId": "1" + }, + "project": { + "input": { + "common": { + "planId": "0" + }, + "localRelation": { + "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e" + } + }, + "expressions": [{ + "unresolvedFunction": { + "functionName": "try_element_at", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "e" + } + }, { + "unresolvedAttribute": { + "unparsedIdentifier": "a" + } + }] + } + }] + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_try_element_at_array.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_try_element_at_array.proto.bin new file mode 100644 index 00000000000..b86d5efd409 Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_try_element_at_array.proto.bin differ diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_try_element_at_map.json b/connector/connect/common/src/test/resources/query-tests/queries/function_try_element_at_map.json new file mode 100644 index 00000000000..c4e5bc2f415 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_try_element_at_map.json @@ -0,0 +1,29 @@ +{ + "common": { + "planId": "1" + }, + "project": { + "input": { + "common": { + "planId": "0" + }, + "localRelation": { + "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e" + } + }, + "expressions": [{ + "unresolvedFunction": { + "functionName": "try_element_at", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "f" + } + }, { + "unresolvedAttribute": { + "unparsedIdentifier": "g" + } + }] + } + }] + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_try_element_at_map.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_try_element_at_map.proto.bin new file mode 100644 index 00000000000..2f6c54f2fa5 Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_try_element_at_map.proto.bin differ diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_try_multiply.json b/connector/connect/common/src/test/resources/query-tests/queries/function_try_multiply.json new file mode 100644 index 00000000000..df22654c820 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_try_multiply.json @@ -0,0 +1,29 @@ +{ + "common": { + "planId": "1" + }, + "project": { + "input": { + "common": { + "planId": "0" + }, + "localRelation": { + "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e" + } + }, + "expressions": [{ + "unresolvedFunction": { + "functionName": "try_multiply", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "a" + } + }, { + "unresolvedAttribute": { + "unparsedIdentifier": "a" + } + }] + } + }] + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_try_multiply.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_try_multiply.proto.bin new file mode 100644 index 00000000000..8912423235e Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_try_multiply.proto.bin differ diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_try_subtract.json b/connector/connect/common/src/test/resources/query-tests/queries/function_try_subtract.json new file mode 100644 index 00000000000..f3a5df24cce --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_try_subtract.json @@ -0,0 +1,29 @@ +{ + "common": { + "planId": "1" + }, + "project": { + "input": { + "common": { + "planId": "0" + }, + "localRelation": { + "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e" + } + }, + "expressions": [{ + "unresolvedFunction": { + "functionName": "try_subtract", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "a" + } + }, { + "unresolvedAttribute": { + "unparsedIdentifier": "a" + } + }] + } + }] + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_try_subtract.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_try_subtract.proto.bin new file mode 100644 index 00000000000..f0cb5f50278 Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_try_subtract.proto.bin differ diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_try_sum.json b/connector/connect/common/src/test/resources/query-tests/queries/function_try_sum.json new file mode 100644 index 00000000000..41e93d1fcf9 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_try_sum.json @@ -0,0 +1,25 @@ +{ + "common": { + "planId": "1" + }, + "project": { + "input": { + "common": { + "planId": "0" + }, + "localRelation": { + "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e" + } + }, + "expressions": [{ + "unresolvedFunction": { + "functionName": "try_sum", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "a" + } + }] + } + }] + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_try_sum.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_try_sum.proto.bin new file mode 100644 index 00000000000..dce7d9df359 Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_try_sum.proto.bin differ diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_try_to_binary.json b/connector/connect/common/src/test/resources/query-tests/queries/function_try_to_binary.json new file mode 100644 index 00000000000..9b57b6b26b5 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_try_to_binary.json @@ -0,0 +1,29 @@ +{ + "common": { + "planId": "1" + }, + "project": { + "input": { + "common": { + "planId": "0" + }, + "localRelation": { + "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e" + } + }, + "expressions": [{ + "unresolvedFunction": { + "functionName": "try_to_binary", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "g" + } + }, { + "literal": { + "string": "format" + } + }] + } + }] + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_try_to_binary.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_try_to_binary.proto.bin new file mode 100644 index 00000000000..28b70591607 Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_try_to_binary.proto.bin differ diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_try_to_binary_without_format.json b/connector/connect/common/src/test/resources/query-tests/queries/function_try_to_binary_without_format.json new file mode 100644 index 00000000000..2498ff9a787 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_try_to_binary_without_format.json @@ -0,0 +1,25 @@ +{ + "common": { + "planId": "1" + }, + "project": { + "input": { + "common": { + "planId": "0" + }, + "localRelation": { + "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e" + } + }, + "expressions": [{ + "unresolvedFunction": { + "functionName": "try_to_binary", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "g" + } + }] + } + }] + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_try_to_binary_without_format.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_try_to_binary_without_format.proto.bin new file mode 100644 index 00000000000..682eb1821a3 Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_try_to_binary_without_format.proto.bin differ diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_try_to_number.json b/connector/connect/common/src/test/resources/query-tests/queries/function_try_to_number.json new file mode 100644 index 00000000000..44e894743df --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_try_to_number.json @@ -0,0 +1,29 @@ +{ + "common": { + "planId": "1" + }, + "project": { + "input": { + "common": { + "planId": "0" + }, + "localRelation": { + "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e" + } + }, + "expressions": [{ + "unresolvedFunction": { + "functionName": "try_to_number", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "g" + } + }, { + "literal": { + "string": "99,999" + } + }] + } + }] + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_try_to_number.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_try_to_number.proto.bin new file mode 100644 index 00000000000..c2eba8a19d5 Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_try_to_number.proto.bin differ diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_try_to_timestamp.json b/connector/connect/common/src/test/resources/query-tests/queries/function_try_to_timestamp.json new file mode 100644 index 00000000000..d00967823a3 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_try_to_timestamp.json @@ -0,0 +1,29 @@ +{ + "common": { + "planId": "1" + }, + "project": { + "input": { + "common": { + "planId": "0" + }, + "localRelation": { + "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e" + } + }, + "expressions": [{ + "unresolvedFunction": { + "functionName": "try_to_timestamp", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "g" + } + }, { + "unresolvedAttribute": { + "unparsedIdentifier": "g" + } + }] + } + }] + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_try_to_timestamp.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_try_to_timestamp.proto.bin new file mode 100644 index 00000000000..4f0300d48a6 Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_try_to_timestamp.proto.bin differ diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_try_to_timestamp_without_format.json b/connector/connect/common/src/test/resources/query-tests/queries/function_try_to_timestamp_without_format.json new file mode 100644 index 00000000000..4fdfc38ca53 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_try_to_timestamp_without_format.json @@ -0,0 +1,25 @@ +{ + "common": { + "planId": "1" + }, + "project": { + "input": { + "common": { + "planId": "0" + }, + "localRelation": { + "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e" + } + }, + "expressions": [{ + "unresolvedFunction": { + "functionName": "try_to_timestamp", + "arguments": [{ + "unresolvedAttribute": { + "unparsedIdentifier": "g" + } + }] + } + }] + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_try_to_timestamp_without_format.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_try_to_timestamp_without_format.proto.bin new file mode 100644 index 00000000000..91a4156e305 Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_try_to_timestamp_without_format.proto.bin differ diff --git a/python/docs/source/reference/pyspark.sql/functions.rst b/python/docs/source/reference/pyspark.sql/functions.rst index 8195f06bea8..cf581f40e7a 100644 --- a/python/docs/source/reference/pyspark.sql/functions.rst +++ b/python/docs/source/reference/pyspark.sql/functions.rst @@ -104,6 +104,14 @@ Math Functions tan tanh toDegrees + try_add + try_avg + try_divide + try_multiply + try_subtract + try_sum + try_to_binary + try_to_number degrees toRadians radians @@ -171,6 +179,7 @@ Datetime Functions timestamp_micros timestamp_millis timestamp_seconds + try_to_timestamp unix_date unix_micros unix_millis @@ -246,6 +255,7 @@ Collection Functions schema_of_csv str_to_map to_csv + try_element_at Partition Transformation Functions diff --git a/python/pyspark/sql/connect/functions.py b/python/pyspark/sql/connect/functions.py index de43d59773e..aaba75cc107 100644 --- a/python/pyspark/sql/connect/functions.py +++ b/python/pyspark/sql/connect/functions.py @@ -843,6 +843,48 @@ def sqrt(col: "ColumnOrName") -> Column: sqrt.__doc__ = pysparkfuncs.sqrt.__doc__ +def try_add(left: "ColumnOrName", right: "ColumnOrName") -> Column: + return _invoke_function_over_columns("try_add", left, right) + + +try_add.__doc__ = pysparkfuncs.try_add.__doc__ + + +def try_avg(col: "ColumnOrName") -> Column: + return _invoke_function_over_columns("try_avg", col) + + +try_avg.__doc__ = pysparkfuncs.try_avg.__doc__ + + +def try_divide(left: "ColumnOrName", right: "ColumnOrName") -> Column: + return _invoke_function_over_columns("try_divide", left, right) + + +try_divide.__doc__ = pysparkfuncs.try_divide.__doc__ + + +def try_multiply(left: "ColumnOrName", right: "ColumnOrName") -> Column: + return _invoke_function_over_columns("try_multiply", left, right) + + +try_multiply.__doc__ = pysparkfuncs.try_multiply.__doc__ + + +def try_subtract(left: "ColumnOrName", right: "ColumnOrName") -> Column: + return _invoke_function_over_columns("try_subtract", left, right) + + +try_subtract.__doc__ = pysparkfuncs.try_subtract.__doc__ + + +def try_sum(col: "ColumnOrName") -> Column: + return _invoke_function_over_columns("try_sum", col) + + +try_sum.__doc__ = pysparkfuncs.try_sum.__doc__ + + def tan(col: "ColumnOrName") -> Column: return _invoke_function_over_columns("tan", col) @@ -1638,6 +1680,13 @@ def element_at(col: "ColumnOrName", extraction: Any) -> Column: element_at.__doc__ = pysparkfuncs.element_at.__doc__ +def try_element_at(col: "ColumnOrName", extraction: "ColumnOrName") -> Column: + return _invoke_function_over_columns("try_element_at", col, extraction) + + +try_element_at.__doc__ = pysparkfuncs.try_element_at.__doc__ + + def exists(col: "ColumnOrName", f: Callable[[Column], Column]) -> Column: return _invoke_higher_order_function("exists", [col], [f]) @@ -2497,6 +2546,23 @@ def char(col: "ColumnOrName") -> Column: char.__doc__ = pysparkfuncs.char.__doc__ +def try_to_binary(col: "ColumnOrName", format: Optional["ColumnOrName"] = None) -> Column: + if format is not None: + return _invoke_function_over_columns("try_to_binary", col, format) + else: + return _invoke_function_over_columns("try_to_binary", col) + + +try_to_binary.__doc__ = pysparkfuncs.try_to_binary.__doc__ + + +def try_to_number(col: "ColumnOrName", format: "ColumnOrName") -> Column: + return _invoke_function_over_columns("try_to_number", col, format) + + +try_to_number.__doc__ = pysparkfuncs.try_to_number.__doc__ + + def btrim(str: "ColumnOrName", trim: Optional["ColumnOrName"] = None) -> Column: if trim is not None: return _invoke_function_over_columns("btrim", str, trim) @@ -2886,6 +2952,16 @@ def to_timestamp(col: "ColumnOrName", format: Optional[str] = None) -> Column: to_timestamp.__doc__ = pysparkfuncs.to_timestamp.__doc__ +def try_to_timestamp(col: "ColumnOrName", format: Optional["ColumnOrName"] = None) -> Column: + if format is not None: + return _invoke_function_over_columns("try_to_timestamp", col, format) + else: + return _invoke_function_over_columns("try_to_timestamp", col) + + +try_to_timestamp.__doc__ = pysparkfuncs.try_to_timestamp.__doc__ + + def xpath(xml: "ColumnOrName", path: "ColumnOrName") -> Column: return _invoke_function_over_columns("xpath", xml, path) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 267c7e433bd..119a8b96bb4 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -350,6 +350,228 @@ def sqrt(col: "ColumnOrName") -> Column: return _invoke_function_over_columns("sqrt", col) +@try_remote_functions +def try_add(left: "ColumnOrName", right: "ColumnOrName") -> Column: + """ + Returns the sum of `left`and `right` and the result is null on overflow. + The acceptable input types are the same with the `+` operator. + + .. versionadded:: 3.5.0 + + Parameters + ---------- + left : :class:`~pyspark.sql.Column` or str + right : :class:`~pyspark.sql.Column` or str + + Examples + -------- + >>> df = spark.createDataFrame([(1982, 15), (1990, 2)], ["birth", "age"]) + >>> df.select(try_add(df.birth, df.age).alias('r')).collect() + [Row(r=1997), Row(r=1992)] + + >>> from pyspark.sql.types import StructType, StructField, IntegerType, StringType + >>> schema = StructType([ + ... StructField("i", IntegerType(), True), + ... StructField("d", StringType(), True), + ... ]) + >>> df = spark.createDataFrame([(1, '2015-09-30')], schema) + >>> df = df.select(df.i, to_date(df.d).alias('d')) + >>> df.select(try_add(df.d, df.i).alias('r')).collect() + [Row(r=datetime.date(2015, 10, 1))] + + >>> df.select(try_add(df.d, make_interval(df.i)).alias('r')).collect() + [Row(r=datetime.date(2016, 9, 30))] + + >>> df.select( + ... try_add(df.d, make_interval(lit(0), lit(0), lit(0), df.i)).alias('r') + ... ).collect() + [Row(r=datetime.date(2015, 10, 1))] + + >>> df.select( + ... try_add(make_interval(df.i), make_interval(df.i)).alias('r') + ... ).show(truncate=False) + +-------+ + |r | + +-------+ + |2 years| + +-------+ + """ + return _invoke_function_over_columns("try_add", left, right) + + +@try_remote_functions +def try_avg(col: "ColumnOrName") -> Column: + """ + Returns the mean calculated from values of a group and the result is null on overflow. + + .. versionadded:: 3.5.0 + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + + Examples + -------- + >>> df = spark.createDataFrame([(1982, 15), (1990, 2)], ["birth", "age"]) + >>> df.select(try_avg(df.age).alias('r')).collect() + [Row(r=8.5)] + """ + return _invoke_function_over_columns("try_avg", col) + + +@try_remote_functions +def try_divide(left: "ColumnOrName", right: "ColumnOrName") -> Column: + """ + Returns `dividend`/`divisor`. It always performs floating point division. Its result is + always null if `divisor` is 0. + + .. versionadded:: 3.5.0 + + Parameters + ---------- + left : :class:`~pyspark.sql.Column` or str + dividend + right : :class:`~pyspark.sql.Column` or str + divisor + + Examples + -------- + >>> df = spark.createDataFrame([(6000, 15), (1990, 2)], ["a", "b"]) + >>> df.select(try_divide(df.a, df.b).alias('r')).collect() + [Row(r=400.0), Row(r=995.0)] + + >>> df = spark.createDataFrame([(1, 2)], ["year", "month"]) + >>> df.select( + ... try_divide(make_interval(df.year), df.month).alias('r') + ... ).show(truncate=False) + +--------+ + |r | + +--------+ + |6 months| + +--------+ + + >>> df.select( + ... try_divide(make_interval(df.year, df.month), lit(2)).alias('r') + ... ).show(truncate=False) + +--------+ + |r | + +--------+ + |7 months| + +--------+ + + >>> df.select( + ... try_divide(make_interval(df.year, df.month), lit(0)).alias('r') + ... ).show(truncate=False) + +----+ + |r | + +----+ + |NULL| + +----+ + """ + return _invoke_function_over_columns("try_divide", left, right) + + +@try_remote_functions +def try_multiply(left: "ColumnOrName", right: "ColumnOrName") -> Column: + """ + Returns `left`*`right` and the result is null on overflow. The acceptable input types are the + same with the `*` operator. + + .. versionadded:: 3.5.0 + + Parameters + ---------- + left : :class:`~pyspark.sql.Column` or str + multiplicand + right : :class:`~pyspark.sql.Column` or str + multiplier + + Examples + -------- + >>> df = spark.createDataFrame([(6000, 15), (1990, 2)], ["a", "b"]) + >>> df.select(try_multiply(df.a, df.b).alias('r')).collect() + [Row(r=90000), Row(r=3980)] + + >>> df = spark.createDataFrame([(2, 3),], ["a", "b"]) + >>> df.select(try_multiply(make_interval(df.a), df.b).alias('r')).show(truncate=False) + +-------+ + |r | + +-------+ + |6 years| + +-------+ + """ + return _invoke_function_over_columns("try_multiply", left, right) + + +@try_remote_functions +def try_subtract(left: "ColumnOrName", right: "ColumnOrName") -> Column: + """ + Returns `left`-`right` and the result is null on overflow. The acceptable input types are the + same with the `-` operator. + + .. versionadded:: 3.5.0 + + Parameters + ---------- + left : :class:`~pyspark.sql.Column` or str + right : :class:`~pyspark.sql.Column` or str + + Examples + -------- + >>> df = spark.createDataFrame([(6000, 15), (1990, 2)], ["a", "b"]) + >>> df.select(try_subtract(df.a, df.b).alias('r')).collect() + [Row(r=5985), Row(r=1988)] + + >>> from pyspark.sql.types import StructType, StructField, IntegerType, StringType + >>> schema = StructType([ + ... StructField("i", IntegerType(), True), + ... StructField("d", StringType(), True), + ... ]) + >>> df = spark.createDataFrame([(1, '2015-09-30')], schema) + >>> df = df.select(df.i, to_date(df.d).alias('d')) + >>> df.select(try_subtract(df.d, df.i).alias('r')).collect() + [Row(r=datetime.date(2015, 9, 29))] + + >>> df.select(try_subtract(df.d, make_interval(df.i)).alias('r')).collect() + [Row(r=datetime.date(2014, 9, 30))] + + >>> df.select( + ... try_subtract(df.d, make_interval(lit(0), lit(0), lit(0), df.i)).alias('r') + ... ).collect() + [Row(r=datetime.date(2015, 9, 29))] + + >>> df.select( + ... try_subtract(make_interval(df.i), make_interval(df.i)).alias('r') + ... ).show(truncate=False) + +---------+ + |r | + +---------+ + |0 seconds| + +---------+ + """ + return _invoke_function_over_columns("try_subtract", left, right) + + +@try_remote_functions +def try_sum(col: "ColumnOrName") -> Column: + """ + Returns the sum calculated from values of a group and the result is null on overflow. + + .. versionadded:: 3.5.0 + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + + Examples + -------- + >>> df = spark.range(10) + >>> df.select(try_sum(df["id"]).alias('r')).collect() + [Row(r=45)] + """ + return _invoke_function_over_columns("try_sum", col) + + @try_remote_functions def abs(col: "ColumnOrName") -> Column: """ @@ -6615,6 +6837,36 @@ def to_timestamp(col: "ColumnOrName", format: Optional[str] = None) -> Column: return _invoke_function("to_timestamp", _to_java_column(col), format) +def try_to_timestamp(col: "ColumnOrName", format: Optional["ColumnOrName"] = None) -> Column: + """ + Parses the `col` with the `format` to a timestamp. The function always + returns null on an invalid input with/without ANSI SQL mode enabled. The result data type is + consistent with the value of configuration `spark.sql.timestampType`. + + .. versionadded:: 3.5.0 + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + column values to convert. + format: str, optional + format to use to convert timestamp values. + + Examples + -------- + >>> df = spark.createDataFrame([('1997-02-28 10:30:00',)], ['t']) + >>> df.select(try_to_timestamp(df.t).alias('dt')).collect() + [Row(dt=datetime.datetime(1997, 2, 28, 10, 30))] + + >>> df.select(try_to_timestamp(df.t, lit('yyyy-MM-dd HH:mm:ss')).alias('dt')).collect() + [Row(dt=datetime.datetime(1997, 2, 28, 10, 30))] + """ + if format is not None: + return _invoke_function_over_columns("try_to_timestamp", col, format) + else: + return _invoke_function_over_columns("try_to_timestamp", col) + + @try_remote_functions def xpath(xml: "ColumnOrName", path: "ColumnOrName") -> Column: """ @@ -9957,6 +10209,61 @@ def chr(col: "ColumnOrName") -> Column: return _invoke_function_over_columns("chr", col) +def try_to_binary(col: "ColumnOrName", format: Optional["ColumnOrName"] = None) -> Column: + """ + This is a special version of `to_binary` that performs the same operation, but returns a NULL + value instead of raising an error if the conversion cannot be performed. + + .. versionadded:: 3.5.0 + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + Input column or strings. + format : :class:`~pyspark.sql.Column` or str, optional + format to use to convert binary values. + + Examples + -------- + >>> df = spark.createDataFrame([("abc",)], ["e"]) + >>> df.select(try_to_binary(df.e, lit("utf-8")).alias('r')).collect() + [Row(r=bytearray(b'abc'))] + + >>> df = spark.createDataFrame([("414243",)], ["e"]) + >>> df.select(try_to_binary(df.e).alias('r')).collect() + [Row(r=bytearray(b'ABC'))] + """ + if format is not None: + return _invoke_function_over_columns("try_to_binary", col, format) + else: + return _invoke_function_over_columns("try_to_binary", col) + + +@try_remote_functions +def try_to_number(col: "ColumnOrName", format: "ColumnOrName") -> Column: + """ + Convert string 'col' to a number based on the string format `format`. Returns NULL if the + string 'col' does not match the expected format. The format follows the same semantics as the + to_number function. + + .. versionadded:: 3.5.0 + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + Input column or strings. + format : :class:`~pyspark.sql.Column` or str, optional + format to use to convert number values. + + Examples + -------- + >>> df = spark.createDataFrame([("$78.12",)], ["e"]) + >>> df.select(try_to_number(df.e, lit("$99.99")).alias('r')).collect() + [Row(r=Decimal('78.12'))] + """ + return _invoke_function_over_columns("try_to_number", col, format) + + @try_remote_functions def contains(left: "ColumnOrName", right: "ColumnOrName") -> Column: """ @@ -10638,6 +10945,40 @@ def element_at(col: "ColumnOrName", extraction: Any) -> Column: return _invoke_function_over_columns("element_at", col, lit(extraction)) +@try_remote_functions +def try_element_at(col: "ColumnOrName", extraction: "ColumnOrName") -> Column: + """ + (array, index) - Returns element of array at given (1-based) index. If Index is 0, Spark will + throw an error. If index < 0, accesses elements from the last to the first. The function + always returns NULL if the index exceeds the length of the array. + + (map, key) - Returns value for given key. The function always returns NULL if the key is not + contained in the map. + + .. versionadded:: 3.5.0 + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + name of column containing array or map + extraction : + index to check for in array or key to check for in map + + Examples + -------- + >>> df = spark.createDataFrame([(["a", "b", "c"],)], ['data']) + >>> df.select(try_element_at(df.data, lit(1)).alias('r')).collect() + [Row(r='a')] + >>> df.select(try_element_at(df.data, lit(-1)).alias('r')).collect() + [Row(r='c')] + + >>> df = spark.createDataFrame([({"a": 1.0, "b": 2.0},)], ['data']) + >>> df.select(try_element_at(df.data, lit("a")).alias('r')).collect() + [Row(r=1.0)] + """ + return _invoke_function_over_columns("try_element_at", col, extraction) + + @try_remote_functions def get(col: "ColumnOrName", index: Union["ColumnOrName", int]) -> Column: """ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 6c1b1262c53..41a3781d2ed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -1872,6 +1872,70 @@ object functions { */ def sqrt(colName: String): Column = sqrt(Column(colName)) + /** + * Returns the sum of `left` and `right` and the result is null on overflow. The acceptable + * input types are the same with the `+` operator. + * + * @group math_funcs + * @since 3.5.0 + */ + def try_add(left: Column, right: Column): Column = withExpr { + UnresolvedFunction("try_add", Seq(left.expr, right.expr), isDistinct = false) + } + + /** + * Returns the mean calculated from values of a group and the result is null on overflow. + * + * @group math_funcs + * @since 3.5.0 + */ + def try_avg(e: Column): Column = withAggregateFunction { + Average(e.expr, EvalMode.TRY) + } + + /** + * Returns `dividend``/``divisor`. It always performs floating point division. Its result is + * always null if `divisor` is 0. + * + * @group math_funcs + * @since 3.5.0 + */ + def try_divide(dividend: Column, divisor: Column): Column = withExpr { + UnresolvedFunction("try_divide", Seq(dividend.expr, divisor.expr), isDistinct = false) + } + + /** + * Returns `left``*``right` and the result is null on overflow. The acceptable input types are + * the same with the `*` operator. + * + * @group math_funcs + * @since 3.5.0 + */ + def try_multiply(left: Column, right: Column): Column = withExpr { + UnresolvedFunction("try_multiply", Seq(left.expr, right.expr), isDistinct = false) + } + + /** + * Returns `left``-``right` and the result is null on overflow. The acceptable input types are + * the same with the `-` operator. + * + * @group math_funcs + * @since 3.5.0 + */ + def try_subtract(left: Column, right: Column): Column = withExpr { + UnresolvedFunction("try_subtract", Seq(left.expr, right.expr), isDistinct = false) + } + + /** + * Returns the sum calculated from values of a group and the result is null on overflow. + * + * @group math_funcs + * @since 3.5.0 + */ + def try_sum(e: Column): Column = withAggregateFunction { + Sum(e.expr, EvalMode.TRY) + } + /** * Creates a new struct column. * If the input column is a column in a `DataFrame`, or a derived column expression @@ -4098,6 +4162,40 @@ object functions { new StringTrimBoth(str.expr, trim.expr) } + /** + * This is a special version of `to_binary` that performs the same operation, but returns a NULL + * value instead of raising an error if the conversion cannot be performed. + * + * @group string_funcs + * @since 3.5.0 + */ + def try_to_binary(e: Column, format: Column): Column = withExpr { + new TryToBinary(e.expr, format.expr) + } + + /** + * This is a special version of `to_binary` that performs the same operation, but returns a NULL + * value instead of raising an error if the conversion cannot be performed. + * + * @group string_funcs + * @since 3.5.0 + */ + def try_to_binary(e: Column): Column = withExpr { + new TryToBinary(e.expr) + } + + /** + * Convert string `e` to a number based on the string format `format`. Returns NULL if the + * string `e` does not match the expected format. The format follows the same semantics as the + * to_number function. + * + * @group string_funcs + * @since 3.5.0 + */ + def try_to_number(e: Column, format: Column): Column = withExpr { + TryToNumber(e.expr, format.expr) + } + /** * Returns the character length of string data or number of bytes of binary data. * The length of string data includes the trailing spaces. @@ -4831,6 +4929,30 @@ object functions { new ParseToTimestamp(s.expr, Literal(fmt)) } + /** + * Parses the `s` with the `format` to a timestamp. The function always returns null on an + * invalid input with`/`without ANSI SQL mode enabled. The result data type is consistent with + * the value of configuration `spark.sql.timestampType`. + * + * @group datetime_funcs + * @since 3.5.0 + */ + def try_to_timestamp(s: Column, format: Column): Column = withExpr { + new ParseToTimestamp(s.expr, format.expr) + } + + /** + * Parses the `s` to a timestamp. The function always returns null on an invalid + * input with`/`without ANSI SQL mode enabled. It follows casting rules to a timestamp. The + * result data type is consistent with the value of configuration `spark.sql.timestampType`. + * + * @group datetime_funcs + * @since 3.5.0 + */ + def try_to_timestamp(s: Column): Column = withExpr { + new ParseToTimestamp(s.expr) + } + /** * Converts the column into `DateType` by casting rules to `DateType`. * @@ -5442,6 +5564,21 @@ object functions { ElementAt(column.expr, lit(value).expr) } + /** + * (array, index) - Returns element of array at given (1-based) index. If Index is 0, Spark will + * throw an error. If index < 0, accesses elements from the last to the first. The function + * always returns NULL if the index exceeds the length of the array. + * + * (map, key) - Returns value for given key. The function always returns NULL if the key is not + * contained in the map. + * + * @group map_funcs + * @since 3.5.0 + */ + def try_element_at(column: Column, value: Column): Column = withExpr { + new TryElementAt(column.expr, value.expr) + } + /** * Returns element of array at given (0-based) index. If the index points * outside of the array boundaries, then this function returns NULL. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala index d5d2fe8a5d3..a8c304ff66d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala @@ -1357,4 +1357,15 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { val result6 = df.select(make_ym_interval()) checkAnswer(result5, result6) } + + test("try_to_timestamp") { + val df = Seq(("2016-12-31", "yyyy-MM-dd")).toDF("a", "b") + val ts = Timestamp.valueOf("2016-12-31 00:00:00") + + checkAnswer(df.selectExpr("try_to_timestamp(a, b)"), Seq(Row(ts))) + checkAnswer(df.select(try_to_timestamp(col("a"), col("b"))), Seq(Row(ts))) + + checkAnswer(df.selectExpr("try_to_timestamp(a)"), Seq(Row(ts))) + checkAnswer(df.select(try_to_timestamp(col("a"))), Seq(Row(ts))) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/MathFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/MathFunctionsSuite.scala index fde55e27bf3..0adb89c3a9e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/MathFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/MathFunctionsSuite.scala @@ -18,7 +18,10 @@ package org.apache.spark.sql import java.nio.charset.StandardCharsets +import java.sql.Date +import java.text.SimpleDateFormat import java.time.Period +import java.util.Locale import org.apache.spark.sql.functions._ import org.apache.spark.sql.functions.{log => logarithm} @@ -647,4 +650,92 @@ class MathFunctionsSuite extends QueryTest with SharedSparkSession { df1.select(width_bucket(col("v"), col("min"), col("max"), col("n"))) ) } + + val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US) + val sdfDate = new SimpleDateFormat("yyyy-MM-dd", Locale.US) + val d = new Date(sdf.parse("2015-04-08 13:10:15").getTime) + + test("try_add") { + val df = Seq((1982, 15)).toDF("birth", "age") + + checkAnswer(df.selectExpr("try_add(birth, age)"), Seq(Row(1997))) + checkAnswer(df.select(try_add(col("birth"), col("age"))), Seq(Row(1997))) + + val d1 = Date.valueOf("2015-09-30") + val d2 = Date.valueOf("2016-02-29") + val df1 = Seq((1, d1), (2, d2)).toDF("i", "d") + + checkAnswer(df1.selectExpr("try_add(d, i)"), + df1.select(try_add(col("d"), col("i")))) + checkAnswer(df1.selectExpr(s"try_add(d, make_interval(i))"), + df1.select(try_add(column("d"), make_interval(col("i"))))) + checkAnswer(df1.selectExpr(s"try_add(d, make_interval(0, 0, 0, i))"), + df1.select(try_add(column("d"), make_interval(lit(0), lit(0), lit(0), col("i"))))) + checkAnswer(df1.selectExpr("try_add(make_interval(i), make_interval(i))"), + df1.select(try_add(make_interval(col("i")), make_interval(col("i"))))) + } + + test("try_avg") { + val df = Seq((1982, 15), (1990, 11)).toDF("birth", "age") + + checkAnswer(df.selectExpr("try_avg(age)"), Seq(Row(13))) + checkAnswer(df.select(try_avg(col("age"))), Seq(Row(13))) + } + + test("try_divide") { + val df = Seq((2000, 10), (2050, 5)).toDF("birth", "age") + + checkAnswer(df.selectExpr("try_divide(birth, age)"), Seq(Row(200.0), Row(410.0))) + checkAnswer(df.select(try_divide(col("birth"), col("age"))), Seq(Row(200.0), Row(410.0))) + + val df1 = Seq((1, 2)).toDF("year", "month") + + checkAnswer(df1.selectExpr(s"try_divide(make_interval(year, month), 2)"), + df1.select(try_divide(make_interval(col("year"), col("month")), lit(2)))) + checkAnswer(df1.selectExpr(s"try_divide(make_interval(year, month), 0)"), + df1.select(try_divide(make_interval(col("year"), col("month")), lit(0)))) + } + + test("try_element_at") { + val df = Seq((Array(1, 2, 3), 2)).toDF("a", "b") + checkAnswer(df.selectExpr("try_element_at(a, b)"), Seq(Row(2))) + checkAnswer(df.select(try_element_at(col("a"), col("b"))), Seq(Row(2))) + } + + test("try_multiply") { + val df = Seq((2, 3)).toDF("a", "b") + + checkAnswer(df.selectExpr("try_multiply(a, b)"), Seq(Row(6))) + checkAnswer(df.select(try_multiply(col("a"), col("b"))), Seq(Row(6))) + + checkAnswer(df.selectExpr("try_multiply(make_interval(a), b)"), + df.select(try_multiply(make_interval(col("a")), col("b")))) + } + + test("try_subtract") { + val df = Seq((2, 3)).toDF("a", "b") + + checkAnswer(df.selectExpr("try_subtract(a, b)"), Seq(Row(-1))) + checkAnswer(df.select(try_subtract(col("a"), col("b"))), Seq(Row(-1))) + + val d1 = Date.valueOf("2015-09-30") + val d2 = Date.valueOf("2016-02-29") + val df1 = Seq((1, d1), (2, d2)).toDF("i", "d") + + checkAnswer(df1.selectExpr("try_subtract(d, i)"), + df1.select(try_subtract(col("d"), col("i")))) + checkAnswer(df1.selectExpr(s"try_subtract(d, make_interval(i))"), + df1.select(try_subtract(col("d"), make_interval(col("i"))))) + checkAnswer(df1.selectExpr(s"try_subtract(d, make_interval(0, 0, 0, i))"), + df1.select(try_subtract(col("d"), make_interval(lit(0), lit(0), lit(0), col("i"))))) + checkAnswer(df1.selectExpr("try_subtract(make_interval(i), make_interval(i))"), + df1.select(try_subtract(make_interval(col("i")), make_interval(col("i"))))) + } + + test("try_sum") { + val df = Seq((2, 3), (5, 6)).toDF("a", "b") + + checkAnswer(df.selectExpr("try_sum(a)"), Seq(Row(7))) + checkAnswer(df.select(try_sum(col("a"))), Seq(Row(7))) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala index f64b5bc316e..34b1aacbc7c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala @@ -1138,4 +1138,21 @@ class StringFunctionsSuite extends QueryTest with SharedSparkSession { checkAnswer(df.selectExpr("startswith(c, d)"), Row(true)) checkAnswer(df.select(startswith(col("c"), col("d"))), Row(true)) } + + test("try_to_binary") { + val df = Seq("abc").toDF("a") + + checkAnswer(df.selectExpr("try_to_binary(a, 'utf-8')"), + df.select(try_to_binary(col("a"), lit("utf-8")))) + + checkAnswer(df.selectExpr("try_to_binary(a)"), + df.select(try_to_binary(col("a")))) + } + + test("try_to_number") { + val df = Seq("$78.12").toDF("a") + + checkAnswer(df.selectExpr("try_to_number(a, '$99.99')"), Seq(Row(78.12))) + checkAnswer(df.select(try_to_number(col("a"), lit("$99.99"))), Seq(Row(78.12))) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org