This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new d9c7908f348 [SPARK-41397][CONNECT][PYTHON] Implement part of string/binary functions d9c7908f348 is described below commit d9c7908f348fa7771182dca49fa032f6d1b689be Author: Xinrong Meng <xinr...@apache.org> AuthorDate: Wed Dec 7 12:06:57 2022 +0800 [SPARK-41397][CONNECT][PYTHON] Implement part of string/binary functions ### What changes were proposed in this pull request? Implement the first half of string/binary functions. The rest of the string/binary functions will be implemented in a separate PR for easier review. ### Why are the changes needed? For API coverage on Spark Connect. ### Does this PR introduce _any_ user-facing change? Yes. New functions are available on Spark Connect. ### How was this patch tested? Unit tests. Closes #38921 from xinrong-meng/connect_func_string. Authored-by: Xinrong Meng <xinr...@apache.org> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- python/pyspark/sql/connect/functions.py | 346 +++++++++++++++++++++ .../sql/tests/connect/test_connect_function.py | 61 ++++ 2 files changed, 407 insertions(+) diff --git a/python/pyspark/sql/connect/functions.py b/python/pyspark/sql/connect/functions.py index b576a092f99..e57ffd10462 100644 --- a/python/pyspark/sql/connect/functions.py +++ b/python/pyspark/sql/connect/functions.py @@ -3208,3 +3208,349 @@ def variance(col: "ColumnOrName") -> Column: +------------+ """ return var_samp(col) + + +# String/Binary functions + + +def upper(col: "ColumnOrName") -> Column: + """ + Converts a string expression to upper case. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + target column to work on. + + Returns + ------- + :class:`~pyspark.sql.Column` + upper case values. + + Examples + -------- + >>> df = spark.createDataFrame(["Spark", "PySpark", "Pandas API"], "STRING") + >>> df.select(upper("value")).show() + +------------+ + |upper(value)| + +------------+ + | SPARK| + | PYSPARK| + | PANDAS API| + +------------+ + """ + return _invoke_function_over_columns("upper", col) + + +def lower(col: "ColumnOrName") -> Column: + """ + Converts a string expression to lower case. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + target column to work on. + + Returns + ------- + :class:`~pyspark.sql.Column` + lower case values. + + Examples + -------- + >>> df = spark.createDataFrame(["Spark", "PySpark", "Pandas API"], "STRING") + >>> df.select(lower("value")).show() + +------------+ + |lower(value)| + +------------+ + | spark| + | pyspark| + | pandas api| + +------------+ + """ + return _invoke_function_over_columns("lower", col) + + +def ascii(col: "ColumnOrName") -> Column: + """ + Computes the numeric value of the first character of the string column. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + target column to work on. + + Returns + ------- + :class:`~pyspark.sql.Column` + numeric value. + + Examples + -------- + >>> df = spark.createDataFrame(["Spark", "PySpark", "Pandas API"], "STRING") + >>> df.select(ascii("value")).show() + +------------+ + |ascii(value)| + +------------+ + | 83| + | 80| + | 80| + +------------+ + """ + return _invoke_function_over_columns("ascii", col) + + +def base64(col: "ColumnOrName") -> Column: + """ + Computes the BASE64 encoding of a binary column and returns it as a string column. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + target column to work on. + + Returns + ------- + :class:`~pyspark.sql.Column` + BASE64 encoding of string value. + + Examples + -------- + >>> df = spark.createDataFrame(["Spark", "PySpark", "Pandas API"], "STRING") + >>> df.select(base64("value")).show() + +----------------+ + | base64(value)| + +----------------+ + | U3Bhcms=| + | UHlTcGFyaw==| + |UGFuZGFzIEFQSQ==| + +----------------+ + """ + return _invoke_function_over_columns("base64", col) + + +def unbase64(col: "ColumnOrName") -> Column: + """ + Decodes a BASE64 encoded string column and returns it as a binary column. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + target column to work on. + + Returns + ------- + :class:`~pyspark.sql.Column` + encoded string value. + + Examples + -------- + >>> df = spark.createDataFrame(["U3Bhcms=", + ... "UHlTcGFyaw==", + ... "UGFuZGFzIEFQSQ=="], "STRING") + >>> df.select(unbase64("value")).show() + +--------------------+ + | unbase64(value)| + +--------------------+ + | [53 70 61 72 6B]| + |[50 79 53 70 61 7...| + |[50 61 6E 64 61 7...| + +--------------------+ + """ + return _invoke_function_over_columns("unbase64", col) + + +def ltrim(col: "ColumnOrName") -> Column: + """ + Trim the spaces from left end for the specified string value. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + target column to work on. + + Returns + ------- + :class:`~pyspark.sql.Column` + left trimmed values. + + Examples + -------- + >>> df = spark.createDataFrame([" Spark", "Spark ", " Spark"], "STRING") + >>> df.select(ltrim("value").alias("r")).withColumn("length", length("r")).show() + +-------+------+ + | r|length| + +-------+------+ + | Spark| 5| + |Spark | 7| + | Spark| 5| + +-------+------+ + """ + return _invoke_function_over_columns("ltrim", col) + + +def rtrim(col: "ColumnOrName") -> Column: + """ + Trim the spaces from right end for the specified string value. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + target column to work on. + + Returns + ------- + :class:`~pyspark.sql.Column` + right trimmed values. + + Examples + -------- + >>> df = spark.createDataFrame([" Spark", "Spark ", " Spark"], "STRING") + >>> df.select(rtrim("value").alias("r")).withColumn("length", length("r")).show() + +--------+------+ + | r|length| + +--------+------+ + | Spark| 8| + | Spark| 5| + | Spark| 6| + +--------+------+ + """ + return _invoke_function_over_columns("rtrim", col) + + +def trim(col: "ColumnOrName") -> Column: + """ + Trim the spaces from both ends for the specified string column. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + target column to work on. + + Returns + ------- + :class:`~pyspark.sql.Column` + trimmed values from both sides. + + Examples + -------- + >>> df = spark.createDataFrame([" Spark", "Spark ", " Spark"], "STRING") + >>> df.select(trim("value").alias("r")).withColumn("length", length("r")).show() + +-----+------+ + | r|length| + +-----+------+ + |Spark| 5| + |Spark| 5| + |Spark| 5| + +-----+------+ + """ + return _invoke_function_over_columns("trim", col) + + +def concat_ws(sep: str, *cols: "ColumnOrName") -> Column: + """ + Concatenates multiple input string columns together into a single string column, + using the given separator. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + sep : str + words separator. + cols : :class:`~pyspark.sql.Column` or str + list of columns to work on. + + Returns + ------- + :class:`~pyspark.sql.Column` + string of concatenated words. + + Examples + -------- + >>> df = spark.createDataFrame([('abcd','123')], ['s', 'd']) + >>> df.select(concat_ws('-', df.s, df.d).alias('s')).collect() + [Row(s='abcd-123')] + """ + return _invoke_function("concat_ws", lit(sep), *[_to_col(c) for c in cols]) + + +# TODO: enable with SPARK-41402 +# def decode(col: "ColumnOrName", charset: str) -> Column: +# """ +# Computes the first argument into a string from a binary using the provided character set +# (one of 'US-ASCII', 'ISO-8859-1', 'UTF-8', 'UTF-16BE', 'UTF-16LE', 'UTF-16'). +# +# .. versionadded:: 3.4.0 +# +# Parameters +# ---------- +# col : :class:`~pyspark.sql.Column` or str +# target column to work on. +# charset : str +# charset to use to decode to. +# +# Returns +# ------- +# :class:`~pyspark.sql.Column` +# the column for computed results. +# +# Examples +# -------- +# >>> df = spark.createDataFrame([('abcd',)], ['a']) +# >>> df.select(decode("a", "UTF-8")).show() +# +----------------------+ +# |stringdecode(a, UTF-8)| +# +----------------------+ +# | abcd| +# +----------------------+ +# """ +# return _invoke_function("decode", _to_col(col), lit(charset)) + + +def encode(col: "ColumnOrName", charset: str) -> Column: + """ + Computes the first argument into a binary from a string using the provided character set + (one of 'US-ASCII', 'ISO-8859-1', 'UTF-8', 'UTF-16BE', 'UTF-16LE', 'UTF-16'). + + .. versionadded:: 3.4.0 + + Parameters + ---------- + col : :class:`~pyspark.sql.Column` or str + target column to work on. + charset : str + charset to use to encode. + + Returns + ------- + :class:`~pyspark.sql.Column` + the column for computed results. + + Examples + -------- + >>> df = spark.createDataFrame([('abcd',)], ['c']) + >>> df.select(encode("c", "UTF-8")).show() + +----------------+ + |encode(c, UTF-8)| + +----------------+ + | [61 62 63 64]| + +----------------+ + """ + return _invoke_function("encode", _to_col(col), lit(charset)) diff --git a/python/pyspark/sql/tests/connect/test_connect_function.py b/python/pyspark/sql/tests/connect/test_connect_function.py index 2f1e4968942..22f33ce0530 100644 --- a/python/pyspark/sql/tests/connect/test_connect_function.py +++ b/python/pyspark/sql/tests/connect/test_connect_function.py @@ -413,6 +413,67 @@ class SparkConnectFunctionTests(SparkConnectFuncTestCase): sdf.groupBy("a").agg(SF.percentile_approx(sdf.b, [0.1, 0.9])).toPandas(), ) + def test_string_functions(self): + from pyspark.sql import functions as SF + from pyspark.sql.connect import functions as CF + + query = """ + SELECT * FROM VALUES + (' ab ', 'ab ', NULL), (' ab', NULL, 'ab') + AS tab(a, b, c) + """ + # +--------+-----+----+ + # | a| b| c| + # +--------+-----+----+ + # | ab |ab |null| + # | ab| null| ab| + # +--------+-----+----+ + + cdf = self.connect.sql(query) + sdf = self.spark.sql(query) + + for cfunc, sfunc in [ + (CF.upper, SF.upper), + (CF.lower, SF.lower), + (CF.ascii, SF.ascii), + (CF.base64, SF.base64), + (CF.unbase64, SF.unbase64), + (CF.ltrim, SF.ltrim), + (CF.rtrim, SF.rtrim), + (CF.trim, SF.trim), + ]: + self.assert_eq( + cdf.select(cfunc("a"), cfunc(cdf.b)).toPandas(), + sdf.select(sfunc("a"), sfunc(sdf.b)).toPandas(), + ) + + self.assert_eq( + cdf.select(CF.concat_ws("-", cdf.a, "c")).toPandas(), + sdf.select(SF.concat_ws("-", sdf.a, "c")).toPandas(), + ) + + # Disable the test for "decode" because of inconsistent column names, + # as shown below + # + # >>> sdf.select(SF.decode("c", "UTF-8")).toPandas() + # stringdecode(c, UTF-8) + # 0 None + # 1 ab + # >>> cdf.select(CF.decode("c", "UTF-8")).toPandas() + # decode(c, UTF-8) + # 0 None + # 1 ab + # + # self.assert_eq( + # cdf.select(CF.decode("c", "UTF-8")).toPandas(), + # sdf.select(SF.decode("c", "UTF-8")).toPandas(), + # ) + + self.assert_eq( + cdf.select(CF.encode("c", "UTF-8")).toPandas(), + sdf.select(SF.encode("c", "UTF-8")).toPandas(), + ) + if __name__ == "__main__": from pyspark.sql.tests.connect.test_connect_function import * # noqa: F401 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org