This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ed9db14e8c7 [SPARK-40655][PYTHON][PROTOBUF] PySpark support for 
from_protobuf and to_protobuf
ed9db14e8c7 is described below

commit ed9db14e8c79f32a9a3420d908e449f48b555120
Author: SandishKumarHN <sanysand...@gmail.com>
AuthorDate: Wed Oct 19 12:04:25 2022 +0900

    [SPARK-40655][PYTHON][PROTOBUF] PySpark support for from_protobuf and 
to_protobuf
    
    From SandishKumarHN(sanysandishgmail.com) and Mohan 
Parthasarathy(mposdev21gmail.com)
    
    This PR follows main PR https://github.com/apache/spark/pull/37972
    
    The following is an example of how to use from_protobuf and to_protobuf in 
Pyspark.
    
    ```python
    data = [("1", (2, "Alice", 109200))]
    ddl_schema = "key STRING, value STRUCT<age: INTEGER, name: STRING, score: 
LONG>"
    df = spark.createDataFrame(data, ddl_schema)
    desc_hex = 
str('0ACE010A41636F6E6E6563746F722F70726F746F6275662F7372632F746573742F726'
    ... 
'5736F75726365732F70726F746F6275662F7079737061726B5F746573742E70726F746F121D6F72672E61'
    ... 
'70616368652E737061726B2E73716C2E70726F746F627566224B0A0D53696D706C654D657373616765121'
    ... 
'00A03616765180120012805520361676512120A046E616D6518022001280952046E616D6512140A057363'
    ... 
'6F7265180320012803520573636F72654215421353696D706C654D65737361676550726F746F736206707'
    ... '26F746F33')
     import tempfile
    # Writing a protobuf description into a file, generated by using
    # connector/protobuf/src/test/resources/protobuf/pyspark_test.proto file
    with tempfile.TemporaryDirectory() as tmp_dir:
    ...     desc_file_path = "%s/pyspark_test.desc" % tmp_dir
    ...     with open(desc_file_path, "wb") as f:
    ...         _ = f.write(bytearray.fromhex(desc_hex))
    ...         f.flush()
    ...         message_name = 'SimpleMessage'
    ...         proto_df = df.select(to_protobuf(df.value,
    ...         desc_file_path, message_name).alias("value"))
    ...         proto_df.show(truncate=False)
    ...         proto_df = proto_df.select(from_protobuf(proto_df.value,
    ...         desc_file_path, message_name).alias("value"))
    ...         proto_df.show(truncate=False)
        +----------------------------------------+
        |value                                   |
        +----------------------------------------+
        |[08 02 12 05 41 6C 69 63 65 18 90 D5 06]|
        +----------------------------------------+
        +------------------+
        |value             |
        +------------------+
        |{2, Alice, 109200}|
        +------------------+
    ```
    
    ### ****Tests Covered****
    - from_protobuf / to_protobuf (functions.py)
    
    Closes #38212 from SandishKumarHN/PYSPARK_PROTOBUF.
    
    Authored-by: SandishKumarHN <sanysand...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .github/workflows/build_and_test.yml               |   2 +-
 .../src/test/resources/protobuf/pyspark_test.proto |  32 +++
 dev/sparktestsupport/modules.py                    |  14 +-
 dev/sparktestsupport/utils.py                      |  16 +-
 python/docs/source/reference/pyspark.sql/index.rst |   1 +
 .../pyspark.sql/{index.rst => protobuf.rst}        |  30 +--
 python/pyspark/sql/protobuf/__init__.py            |  18 ++
 python/pyspark/sql/protobuf/functions.py           | 215 +++++++++++++++++++++
 8 files changed, 296 insertions(+), 32 deletions(-)

diff --git a/.github/workflows/build_and_test.yml 
b/.github/workflows/build_and_test.yml
index f6f5f026537..64dbe30012c 100644
--- a/.github/workflows/build_and_test.yml
+++ b/.github/workflows/build_and_test.yml
@@ -153,7 +153,7 @@ jobs:
             streaming, sql-kafka-0-10, streaming-kafka-0-10,
             mllib-local, mllib,
             yarn, mesos, kubernetes, hadoop-cloud, spark-ganglia-lgpl,
-            connect
+            connect, protobuf
         # Here, we split Hive and SQL tests into some of slow ones and the 
rest of them.
         included-tags: [""]
         excluded-tags: [""]
diff --git a/connector/protobuf/src/test/resources/protobuf/pyspark_test.proto 
b/connector/protobuf/src/test/resources/protobuf/pyspark_test.proto
new file mode 100644
index 00000000000..8750371349a
--- /dev/null
+++ b/connector/protobuf/src/test/resources/protobuf/pyspark_test.proto
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// TODO(SPARK-40777): Instead of saving .desc files in resources, generate 
during build.
+// To compile and create test class:
+// protoc --java_out=connector/protobuf/src/test/resources/protobuf/ 
connector/protobuf/src/test/resources/protobuf/pyspark_test.proto
+// protoc 
--descriptor_set_out=connector/protobuf/src/test/resources/protobuf/pyspark_test.desc
 
--java_out=connector/protobuf/src/test/resources/protobuf/org/apache/spark/sql/protobuf/
 connector/protobuf/src/test/resources/protobuf/pyspark_test.proto
+
+syntax = "proto3";
+
+package org.apache.spark.sql.protobuf;
+option java_outer_classname = "SimpleMessageProtos";
+
+
+message SimpleMessage {
+  int32 age = 1;
+  string name = 2;
+  int64  score = 3;
+}
\ No newline at end of file
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index e4a515d203c..2a427139148 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -282,6 +282,17 @@ connect = Module(
     ],
 )
 
+protobuf = Module(
+    name="protobuf",
+    dependencies=[sql],
+    source_file_regexes=[
+        "connector/protobuf",
+    ],
+    sbt_test_goals=[
+        "protobuf/test",
+    ],
+)
+
 sketch = Module(
     name="sketch",
     dependencies=[tags],
@@ -423,7 +434,7 @@ pyspark_core = Module(
 
 pyspark_sql = Module(
     name="pyspark-sql",
-    dependencies=[pyspark_core, hive, avro],
+    dependencies=[pyspark_core, hive, avro, protobuf],
     source_file_regexes=["python/pyspark/sql"],
     python_test_goals=[
         # doctests
@@ -443,6 +454,7 @@ pyspark_sql = Module(
         "pyspark.sql.udf",
         "pyspark.sql.window",
         "pyspark.sql.avro.functions",
+        "pyspark.sql.protobuf.functions",
         "pyspark.sql.pandas.conversion",
         "pyspark.sql.pandas.map_ops",
         "pyspark.sql.pandas.group_ops",
diff --git a/dev/sparktestsupport/utils.py b/dev/sparktestsupport/utils.py
index 11d64c4f0bc..37e023aaa63 100755
--- a/dev/sparktestsupport/utils.py
+++ b/dev/sparktestsupport/utils.py
@@ -108,23 +108,23 @@ def determine_modules_to_test(changed_modules, 
deduplicated=True):
     ['graphx', 'examples']
     >>> [x.name for x in determine_modules_to_test([modules.sql])]
     ... # doctest: +NORMALIZE_WHITESPACE
-    ['sql', 'avro', 'connect', 'docker-integration-tests', 'hive', 'mllib', 
'sql-kafka-0-10',
-     'examples', 'hive-thriftserver', 'pyspark-sql', 'repl', 'sparkr', 
'pyspark-connect',
-     'pyspark-mllib', 'pyspark-pandas', 'pyspark-pandas-slow', 'pyspark-ml']
+    ['sql', 'avro', 'connect', 'docker-integration-tests', 'hive', 'mllib', 
'protobuf',
+     'sql-kafka-0-10', 'examples', 'hive-thriftserver', 'pyspark-sql', 'repl', 
'sparkr',
+     'pyspark-connect', 'pyspark-mllib', 'pyspark-pandas', 
'pyspark-pandas-slow', 'pyspark-ml']
     >>> sorted([x.name for x in determine_modules_to_test(
     ...     [modules.sparkr, modules.sql], deduplicated=False)])
     ... # doctest: +NORMALIZE_WHITESPACE
     ['avro', 'connect', 'docker-integration-tests', 'examples', 'hive', 
'hive-thriftserver',
-     'mllib', 'pyspark-connect', 'pyspark-ml', 'pyspark-mllib', 
'pyspark-pandas',
+     'mllib', 'protobuf', 'pyspark-connect', 'pyspark-ml', 'pyspark-mllib', 
'pyspark-pandas',
      'pyspark-pandas-slow', 'pyspark-sql', 'repl', 'sparkr', 'sql', 
'sql-kafka-0-10']
     >>> sorted([x.name for x in determine_modules_to_test(
     ...     [modules.sql, modules.core], deduplicated=False)])
     ... # doctest: +NORMALIZE_WHITESPACE
     ['avro', 'catalyst', 'connect', 'core', 'docker-integration-tests', 
'examples', 'graphx',
-     'hive', 'hive-thriftserver', 'mllib', 'mllib-local', 'pyspark-connect', 
'pyspark-core',
-     'pyspark-ml', 'pyspark-mllib', 'pyspark-pandas', 'pyspark-pandas-slow', 
'pyspark-resource',
-     'pyspark-sql', 'pyspark-streaming', 'repl', 'root', 'sparkr', 'sql', 
'sql-kafka-0-10',
-     'streaming', 'streaming-kafka-0-10', 'streaming-kinesis-asl']
+     'hive', 'hive-thriftserver', 'mllib', 'mllib-local', 'protobuf', 
'pyspark-connect',
+     'pyspark-core', 'pyspark-ml', 'pyspark-mllib', 'pyspark-pandas', 
'pyspark-pandas-slow',
+     'pyspark-resource', 'pyspark-sql', 'pyspark-streaming', 'repl', 'root', 
'sparkr', 'sql',
+     'sql-kafka-0-10', 'streaming', 'streaming-kafka-0-10', 
'streaming-kinesis-asl']
     """
     modules_to_test = set()
     for module in changed_modules:
diff --git a/python/docs/source/reference/pyspark.sql/index.rst 
b/python/docs/source/reference/pyspark.sql/index.rst
index 52aca086cb4..fc4569486a7 100644
--- a/python/docs/source/reference/pyspark.sql/index.rst
+++ b/python/docs/source/reference/pyspark.sql/index.rst
@@ -40,3 +40,4 @@ This page gives an overview of all public Spark SQL API.
     avro
     observation
     udf
+    protobuf
diff --git a/python/docs/source/reference/pyspark.sql/index.rst 
b/python/docs/source/reference/pyspark.sql/protobuf.rst
similarity index 72%
copy from python/docs/source/reference/pyspark.sql/index.rst
copy to python/docs/source/reference/pyspark.sql/protobuf.rst
index 52aca086cb4..0ba3d56c4c3 100644
--- a/python/docs/source/reference/pyspark.sql/index.rst
+++ b/python/docs/source/reference/pyspark.sql/protobuf.rst
@@ -16,27 +16,13 @@
     under the License.
 
 
-=========
-Spark SQL
-=========
+========
+Protobuf
+========
+.. currentmodule:: pyspark.sql.protobuf.functions
 
-This page gives an overview of all public Spark SQL API.
+.. autosummary::
+    :toctree: api/
 
-.. toctree::
-    :maxdepth: 2
-
-    core_classes
-    spark_session
-    configuration
-    io
-    dataframe
-    column
-    data_types
-    row
-    functions
-    window
-    grouping
-    catalog
-    avro
-    observation
-    udf
+    from_protobuf
+    to_protobuf
diff --git a/python/pyspark/sql/protobuf/__init__.py 
b/python/pyspark/sql/protobuf/__init__.py
new file mode 100644
index 00000000000..ac530a5495f
--- /dev/null
+++ b/python/pyspark/sql/protobuf/__init__.py
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+__all__ = ["functions"]
diff --git a/python/pyspark/sql/protobuf/functions.py 
b/python/pyspark/sql/protobuf/functions.py
new file mode 100644
index 00000000000..9f8b90095df
--- /dev/null
+++ b/python/pyspark/sql/protobuf/functions.py
@@ -0,0 +1,215 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A collections of builtin protobuf functions
+"""
+
+
+from typing import Dict, Optional, TYPE_CHECKING
+from pyspark import SparkContext
+from pyspark.sql.column import Column, _to_java_column
+from pyspark.util import _print_missing_jar
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import ColumnOrName
+
+
+def from_protobuf(
+    data: "ColumnOrName",
+    descFilePath: str,
+    messageName: str,
+    options: Optional[Dict[str, str]] = None,
+) -> Column:
+    """
+    Converts a binary column of Protobuf format into its corresponding 
catalyst value.
+    The specified schema must match the read data, otherwise the behavior is 
undefined:
+    it may fail or return arbitrary result.
+    To deserialize the data with a compatible and evolved schema, the expected
+    Protobuf schema can be set via the option protobuf descriptor.
+
+    .. versionadded:: 3.4.0
+
+    Parameters
+    ----------
+    data : :class:`~pyspark.sql.Column` or str
+        the binary column.
+    descFilePath : str
+        the protobuf descriptor in Message GeneratedMessageV3 format.
+    messageName: str
+        the protobuf message name to look for in descriptor file.
+    options : dict, optional
+        options to control how the protobuf record is parsed.
+
+    Notes
+    -----
+    Protobuf functionality is provided as an pluggable external module.
+
+    Examples
+    --------
+    >>> import tempfile
+    >>> data = [("1", (2, "Alice", 109200))]
+    >>> ddl_schema = "key STRING, value STRUCT<age: INTEGER, name: STRING, 
score: LONG>"
+    >>> df = spark.createDataFrame(data, ddl_schema)
+    >>> desc_hex = 
str('0ACE010A41636F6E6E6563746F722F70726F746F6275662F7372632F746573742F726'
+    ...    
'5736F75726365732F70726F746F6275662F7079737061726B5F746573742E70726F746F121D6F72672E61'
+    ...    
'70616368652E737061726B2E73716C2E70726F746F627566224B0A0D53696D706C654D657373616765121'
+    ...    
'00A03616765180120012805520361676512120A046E616D6518022001280952046E616D6512140A057363'
+    ...    
'6F7265180320012803520573636F72654215421353696D706C654D65737361676550726F746F736206707'
+    ...    '26F746F33')
+    >>> # Writing a protobuf description into a file, generated by using
+    >>> # connector/protobuf/src/test/resources/protobuf/pyspark_test.proto 
file
+    >>> with tempfile.TemporaryDirectory() as tmp_dir:
+    ...     desc_file_path = "%s/pyspark_test.desc" % tmp_dir
+    ...     with open(desc_file_path, "wb") as f:
+    ...         _ = f.write(bytearray.fromhex(desc_hex))
+    ...         f.flush()
+    ...         message_name = 'SimpleMessage'
+    ...         proto_df = df.select(
+    ...             to_protobuf(df.value, desc_file_path, 
message_name).alias("value"))
+    ...         proto_df.show(truncate=False)
+    ...         proto_df = proto_df.select(
+    ...             from_protobuf(proto_df.value, desc_file_path, 
message_name).alias("value"))
+    ...         proto_df.show(truncate=False)
+    +----------------------------------------+
+    |value                                   |
+    +----------------------------------------+
+    |[08 02 12 05 41 6C 69 63 65 18 90 D5 06]|
+    +----------------------------------------+
+    +------------------+
+    |value             |
+    +------------------+
+    |{2, Alice, 109200}|
+    +------------------+
+    """
+
+    sc = SparkContext._active_spark_context
+    assert sc is not None and sc._jvm is not None
+    try:
+        jc = sc._jvm.org.apache.spark.sql.protobuf.functions.from_protobuf(
+            _to_java_column(data), descFilePath, messageName, options or {}
+        )
+    except TypeError as e:
+        if str(e) == "'JavaPackage' object is not callable":
+            _print_missing_jar("Protobuf", "protobuf", "protobuf", sc.version)
+        raise
+    return Column(jc)
+
+
+def to_protobuf(data: "ColumnOrName", descFilePath: str, messageName: str) -> 
Column:
+    """
+    Converts a column into binary of protobuf format.
+
+    .. versionadded:: 3.4.0
+
+    Parameters
+    ----------
+    data : :class:`~pyspark.sql.Column` or str
+        the data column.
+    descFilePath : str
+        the protobuf descriptor in Message GeneratedMessageV3 format.
+    messageName: str
+        the protobuf message name to look for in descriptor file.
+
+    Notes
+    -----
+    Protobuf functionality is provided as an pluggable external module
+
+    Examples
+    --------
+    >>> import tempfile
+    >>> data = [([(2, "Alice", 13093020)])]
+    >>> ddl_schema = "value struct<age: INTEGER, name: STRING, score: LONG>"
+    >>> df = spark.createDataFrame(data, ddl_schema)
+    >>> desc_hex = 
str('0ACE010A41636F6E6E6563746F722F70726F746F6275662F7372632F746573742F726'
+    ...    
'5736F75726365732F70726F746F6275662F7079737061726B5F746573742E70726F746F121D6F72672E61'
+    ...    
'70616368652E737061726B2E73716C2E70726F746F627566224B0A0D53696D706C654D657373616765121'
+    ...    
'00A03616765180120012805520361676512120A046E616D6518022001280952046E616D6512140A057363'
+    ...    
'6F7265180320012803520573636F72654215421353696D706C654D65737361676550726F746F736206707'
+    ...    '26F746F33')
+    >>> # Writing a protobuf description into a file, generated by using
+    >>> # connector/protobuf/src/test/resources/protobuf/pyspark_test.proto 
file
+    >>> with tempfile.TemporaryDirectory() as tmp_dir:
+    ...     desc_file_path = "%s/pyspark_test.desc" % tmp_dir
+    ...     with open(desc_file_path, "wb") as f:
+    ...         _ = f.write(bytearray.fromhex(desc_hex))
+    ...         f.flush()
+    ...         message_name = 'SimpleMessage'
+    ...         proto_df = df.select(
+    ...             to_protobuf(df.value, desc_file_path, 
message_name).alias("suite"))
+    ...         proto_df.show(truncate=False)
+    +-------------------------------------------+
+    |suite                                      |
+    +-------------------------------------------+
+    |[08 02 12 05 41 6C 69 63 65 18 9C 91 9F 06]|
+    +-------------------------------------------+
+    """
+    sc = SparkContext._active_spark_context
+    assert sc is not None and sc._jvm is not None
+    try:
+        jc = sc._jvm.org.apache.spark.sql.protobuf.functions.to_protobuf(
+            _to_java_column(data), descFilePath, messageName
+        )
+    except TypeError as e:
+        if str(e) == "'JavaPackage' object is not callable":
+            _print_missing_jar("Protobuf", "protobuf", "protobuf", sc.version)
+        raise
+    return Column(jc)
+
+
+def _test() -> None:
+    import os
+    import sys
+    from pyspark.testing.utils import search_jar
+
+    protobuf_jar = search_jar("connector/protobuf", 
"spark-protobuf-assembly-", "spark-protobuf")
+    if protobuf_jar is None:
+        print(
+            "Skipping all Protobuf Python tests as the optional Protobuf 
project was "
+            "not compiled into a JAR. To run these tests, "
+            "you need to build Spark with 'build/sbt package' or "
+            "'build/mvn package' before running this test."
+        )
+        sys.exit(0)
+    else:
+        existing_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell")
+        jars_args = "--jars %s" % protobuf_jar
+        os.environ["PYSPARK_SUBMIT_ARGS"] = " ".join([jars_args, 
existing_args])
+
+    import doctest
+    from pyspark.sql import SparkSession
+    import pyspark.sql.protobuf.functions
+
+    globs = pyspark.sql.protobuf.functions.__dict__.copy()
+    spark = (
+        SparkSession.builder.master("local[2]")
+        .appName("sql.protobuf.functions tests")
+        .getOrCreate()
+    )
+    globs["spark"] = spark
+    (failure_count, test_count) = doctest.testmod(
+        pyspark.sql.protobuf.functions,
+        globs=globs,
+        optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE,
+    )
+    spark.stop()
+    if failure_count:
+        sys.exit(-1)
+
+
+if __name__ == "__main__":
+    _test()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to