This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d9c7908f348 [SPARK-41397][CONNECT][PYTHON] Implement part of 
string/binary functions
d9c7908f348 is described below

commit d9c7908f348fa7771182dca49fa032f6d1b689be
Author: Xinrong Meng <xinr...@apache.org>
AuthorDate: Wed Dec 7 12:06:57 2022 +0800

    [SPARK-41397][CONNECT][PYTHON] Implement part of string/binary functions
    
    ### What changes were proposed in this pull request?
    Implement the first half of string/binary functions. The rest of the 
string/binary functions will be implemented in a separate PR for easier review.
    
    ### Why are the changes needed?
    For API coverage on Spark Connect.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. New functions are available on Spark Connect.
    
    ### How was this patch tested?
    Unit tests.
    
    Closes #38921 from xinrong-meng/connect_func_string.
    
    Authored-by: Xinrong Meng <xinr...@apache.org>
    Signed-off-by: Ruifeng Zheng <ruife...@apache.org>
---
 python/pyspark/sql/connect/functions.py            | 346 +++++++++++++++++++++
 .../sql/tests/connect/test_connect_function.py     |  61 ++++
 2 files changed, 407 insertions(+)

diff --git a/python/pyspark/sql/connect/functions.py 
b/python/pyspark/sql/connect/functions.py
index b576a092f99..e57ffd10462 100644
--- a/python/pyspark/sql/connect/functions.py
+++ b/python/pyspark/sql/connect/functions.py
@@ -3208,3 +3208,349 @@ def variance(col: "ColumnOrName") -> Column:
     +------------+
     """
     return var_samp(col)
+
+
+# String/Binary functions
+
+
+def upper(col: "ColumnOrName") -> Column:
+    """
+    Converts a string expression to upper case.
+
+    .. versionadded:: 3.4.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+        target column to work on.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        upper case values.
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame(["Spark", "PySpark", "Pandas API"], 
"STRING")
+    >>> df.select(upper("value")).show()
+    +------------+
+    |upper(value)|
+    +------------+
+    |       SPARK|
+    |     PYSPARK|
+    |  PANDAS API|
+    +------------+
+    """
+    return _invoke_function_over_columns("upper", col)
+
+
+def lower(col: "ColumnOrName") -> Column:
+    """
+    Converts a string expression to lower case.
+
+    .. versionadded:: 3.4.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+        target column to work on.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        lower case values.
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame(["Spark", "PySpark", "Pandas API"], 
"STRING")
+    >>> df.select(lower("value")).show()
+    +------------+
+    |lower(value)|
+    +------------+
+    |       spark|
+    |     pyspark|
+    |  pandas api|
+    +------------+
+    """
+    return _invoke_function_over_columns("lower", col)
+
+
+def ascii(col: "ColumnOrName") -> Column:
+    """
+    Computes the numeric value of the first character of the string column.
+
+    .. versionadded:: 3.4.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+        target column to work on.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        numeric value.
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame(["Spark", "PySpark", "Pandas API"], 
"STRING")
+    >>> df.select(ascii("value")).show()
+    +------------+
+    |ascii(value)|
+    +------------+
+    |          83|
+    |          80|
+    |          80|
+    +------------+
+    """
+    return _invoke_function_over_columns("ascii", col)
+
+
+def base64(col: "ColumnOrName") -> Column:
+    """
+    Computes the BASE64 encoding of a binary column and returns it as a string 
column.
+
+    .. versionadded:: 3.4.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+        target column to work on.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        BASE64 encoding of string value.
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame(["Spark", "PySpark", "Pandas API"], 
"STRING")
+    >>> df.select(base64("value")).show()
+    +----------------+
+    |   base64(value)|
+    +----------------+
+    |        U3Bhcms=|
+    |    UHlTcGFyaw==|
+    |UGFuZGFzIEFQSQ==|
+    +----------------+
+    """
+    return _invoke_function_over_columns("base64", col)
+
+
+def unbase64(col: "ColumnOrName") -> Column:
+    """
+    Decodes a BASE64 encoded string column and returns it as a binary column.
+
+    .. versionadded:: 3.4.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+        target column to work on.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        encoded string value.
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame(["U3Bhcms=",
+    ...                             "UHlTcGFyaw==",
+    ...                             "UGFuZGFzIEFQSQ=="], "STRING")
+    >>> df.select(unbase64("value")).show()
+    +--------------------+
+    |     unbase64(value)|
+    +--------------------+
+    |    [53 70 61 72 6B]|
+    |[50 79 53 70 61 7...|
+    |[50 61 6E 64 61 7...|
+    +--------------------+
+    """
+    return _invoke_function_over_columns("unbase64", col)
+
+
+def ltrim(col: "ColumnOrName") -> Column:
+    """
+    Trim the spaces from left end for the specified string value.
+
+    .. versionadded:: 3.4.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+        target column to work on.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        left trimmed values.
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame(["   Spark", "Spark  ", " Spark"], "STRING")
+    >>> df.select(ltrim("value").alias("r")).withColumn("length", 
length("r")).show()
+    +-------+------+
+    |      r|length|
+    +-------+------+
+    |  Spark|     5|
+    |Spark  |     7|
+    |  Spark|     5|
+    +-------+------+
+    """
+    return _invoke_function_over_columns("ltrim", col)
+
+
+def rtrim(col: "ColumnOrName") -> Column:
+    """
+    Trim the spaces from right end for the specified string value.
+
+    .. versionadded:: 3.4.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+        target column to work on.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        right trimmed values.
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame(["   Spark", "Spark  ", " Spark"], "STRING")
+    >>> df.select(rtrim("value").alias("r")).withColumn("length", 
length("r")).show()
+    +--------+------+
+    |       r|length|
+    +--------+------+
+    |   Spark|     8|
+    |   Spark|     5|
+    |   Spark|     6|
+    +--------+------+
+    """
+    return _invoke_function_over_columns("rtrim", col)
+
+
+def trim(col: "ColumnOrName") -> Column:
+    """
+    Trim the spaces from both ends for the specified string column.
+
+    .. versionadded:: 3.4.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+        target column to work on.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        trimmed values from both sides.
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame(["   Spark", "Spark  ", " Spark"], "STRING")
+    >>> df.select(trim("value").alias("r")).withColumn("length", 
length("r")).show()
+    +-----+------+
+    |    r|length|
+    +-----+------+
+    |Spark|     5|
+    |Spark|     5|
+    |Spark|     5|
+    +-----+------+
+    """
+    return _invoke_function_over_columns("trim", col)
+
+
+def concat_ws(sep: str, *cols: "ColumnOrName") -> Column:
+    """
+    Concatenates multiple input string columns together into a single string 
column,
+    using the given separator.
+
+    .. versionadded:: 3.4.0
+
+    Parameters
+    ----------
+    sep : str
+        words separator.
+    cols : :class:`~pyspark.sql.Column` or str
+        list of columns to work on.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        string of concatenated words.
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame([('abcd','123')], ['s', 'd'])
+    >>> df.select(concat_ws('-', df.s, df.d).alias('s')).collect()
+    [Row(s='abcd-123')]
+    """
+    return _invoke_function("concat_ws", lit(sep), *[_to_col(c) for c in cols])
+
+
+# TODO: enable with SPARK-41402
+# def decode(col: "ColumnOrName", charset: str) -> Column:
+#     """
+#     Computes the first argument into a string from a binary using the 
provided character set
+#     (one of 'US-ASCII', 'ISO-8859-1', 'UTF-8', 'UTF-16BE', 'UTF-16LE', 
'UTF-16').
+#
+#     .. versionadded:: 3.4.0
+#
+#     Parameters
+#     ----------
+#     col : :class:`~pyspark.sql.Column` or str
+#         target column to work on.
+#     charset : str
+#         charset to use to decode to.
+#
+#     Returns
+#     -------
+#     :class:`~pyspark.sql.Column`
+#         the column for computed results.
+#
+#     Examples
+#     --------
+#     >>> df = spark.createDataFrame([('abcd',)], ['a'])
+#     >>> df.select(decode("a", "UTF-8")).show()
+#     +----------------------+
+#     |stringdecode(a, UTF-8)|
+#     +----------------------+
+#     |                  abcd|
+#     +----------------------+
+#     """
+#     return _invoke_function("decode", _to_col(col), lit(charset))
+
+
+def encode(col: "ColumnOrName", charset: str) -> Column:
+    """
+    Computes the first argument into a binary from a string using the provided 
character set
+    (one of 'US-ASCII', 'ISO-8859-1', 'UTF-8', 'UTF-16BE', 'UTF-16LE', 
'UTF-16').
+
+    .. versionadded:: 3.4.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+        target column to work on.
+    charset : str
+        charset to use to encode.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        the column for computed results.
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame([('abcd',)], ['c'])
+    >>> df.select(encode("c", "UTF-8")).show()
+    +----------------+
+    |encode(c, UTF-8)|
+    +----------------+
+    |   [61 62 63 64]|
+    +----------------+
+    """
+    return _invoke_function("encode", _to_col(col), lit(charset))
diff --git a/python/pyspark/sql/tests/connect/test_connect_function.py 
b/python/pyspark/sql/tests/connect/test_connect_function.py
index 2f1e4968942..22f33ce0530 100644
--- a/python/pyspark/sql/tests/connect/test_connect_function.py
+++ b/python/pyspark/sql/tests/connect/test_connect_function.py
@@ -413,6 +413,67 @@ class SparkConnectFunctionTests(SparkConnectFuncTestCase):
             sdf.groupBy("a").agg(SF.percentile_approx(sdf.b, [0.1, 
0.9])).toPandas(),
         )
 
+    def test_string_functions(self):
+        from pyspark.sql import functions as SF
+        from pyspark.sql.connect import functions as CF
+
+        query = """
+            SELECT * FROM VALUES
+            ('   ab   ', 'ab   ', NULL), ('   ab', NULL, 'ab')
+            AS tab(a, b, c)
+            """
+        # +--------+-----+----+
+        # |       a|    b|   c|
+        # +--------+-----+----+
+        # |   ab   |ab   |null|
+        # |      ab| null|  ab|
+        # +--------+-----+----+
+
+        cdf = self.connect.sql(query)
+        sdf = self.spark.sql(query)
+
+        for cfunc, sfunc in [
+            (CF.upper, SF.upper),
+            (CF.lower, SF.lower),
+            (CF.ascii, SF.ascii),
+            (CF.base64, SF.base64),
+            (CF.unbase64, SF.unbase64),
+            (CF.ltrim, SF.ltrim),
+            (CF.rtrim, SF.rtrim),
+            (CF.trim, SF.trim),
+        ]:
+            self.assert_eq(
+                cdf.select(cfunc("a"), cfunc(cdf.b)).toPandas(),
+                sdf.select(sfunc("a"), sfunc(sdf.b)).toPandas(),
+            )
+
+        self.assert_eq(
+            cdf.select(CF.concat_ws("-", cdf.a, "c")).toPandas(),
+            sdf.select(SF.concat_ws("-", sdf.a, "c")).toPandas(),
+        )
+
+        # Disable the test for "decode" because of inconsistent column names,
+        # as shown below
+        #
+        # >>> sdf.select(SF.decode("c", "UTF-8")).toPandas()
+        # stringdecode(c, UTF-8)
+        # 0                   None
+        # 1                     ab
+        # >>> cdf.select(CF.decode("c", "UTF-8")).toPandas()
+        # decode(c, UTF-8)
+        # 0             None
+        # 1               ab
+        #
+        # self.assert_eq(
+        #     cdf.select(CF.decode("c", "UTF-8")).toPandas(),
+        #     sdf.select(SF.decode("c", "UTF-8")).toPandas(),
+        # )
+
+        self.assert_eq(
+            cdf.select(CF.encode("c", "UTF-8")).toPandas(),
+            sdf.select(SF.encode("c", "UTF-8")).toPandas(),
+        )
+
 
 if __name__ == "__main__":
     from pyspark.sql.tests.connect.test_connect_function import *  # noqa: F401


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to