This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ed9db14e8c7 [SPARK-40655][PYTHON][PROTOBUF] PySpark support for from_protobuf and to_protobuf ed9db14e8c7 is described below commit ed9db14e8c79f32a9a3420d908e449f48b555120 Author: SandishKumarHN <sanysand...@gmail.com> AuthorDate: Wed Oct 19 12:04:25 2022 +0900 [SPARK-40655][PYTHON][PROTOBUF] PySpark support for from_protobuf and to_protobuf From SandishKumarHN(sanysandishgmail.com) and Mohan Parthasarathy(mposdev21gmail.com) This PR follows main PR https://github.com/apache/spark/pull/37972 The following is an example of how to use from_protobuf and to_protobuf in Pyspark. ```python data = [("1", (2, "Alice", 109200))] ddl_schema = "key STRING, value STRUCT<age: INTEGER, name: STRING, score: LONG>" df = spark.createDataFrame(data, ddl_schema) desc_hex = str('0ACE010A41636F6E6E6563746F722F70726F746F6275662F7372632F746573742F726' ... '5736F75726365732F70726F746F6275662F7079737061726B5F746573742E70726F746F121D6F72672E61' ... '70616368652E737061726B2E73716C2E70726F746F627566224B0A0D53696D706C654D657373616765121' ... '00A03616765180120012805520361676512120A046E616D6518022001280952046E616D6512140A057363' ... '6F7265180320012803520573636F72654215421353696D706C654D65737361676550726F746F736206707' ... '26F746F33') import tempfile # Writing a protobuf description into a file, generated by using # connector/protobuf/src/test/resources/protobuf/pyspark_test.proto file with tempfile.TemporaryDirectory() as tmp_dir: ... desc_file_path = "%s/pyspark_test.desc" % tmp_dir ... with open(desc_file_path, "wb") as f: ... _ = f.write(bytearray.fromhex(desc_hex)) ... f.flush() ... message_name = 'SimpleMessage' ... proto_df = df.select(to_protobuf(df.value, ... desc_file_path, message_name).alias("value")) ... proto_df.show(truncate=False) ... proto_df = proto_df.select(from_protobuf(proto_df.value, ... desc_file_path, message_name).alias("value")) ... proto_df.show(truncate=False) +----------------------------------------+ |value | +----------------------------------------+ |[08 02 12 05 41 6C 69 63 65 18 90 D5 06]| +----------------------------------------+ +------------------+ |value | +------------------+ |{2, Alice, 109200}| +------------------+ ``` ### ****Tests Covered**** - from_protobuf / to_protobuf (functions.py) Closes #38212 from SandishKumarHN/PYSPARK_PROTOBUF. Authored-by: SandishKumarHN <sanysand...@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .github/workflows/build_and_test.yml | 2 +- .../src/test/resources/protobuf/pyspark_test.proto | 32 +++ dev/sparktestsupport/modules.py | 14 +- dev/sparktestsupport/utils.py | 16 +- python/docs/source/reference/pyspark.sql/index.rst | 1 + .../pyspark.sql/{index.rst => protobuf.rst} | 30 +-- python/pyspark/sql/protobuf/__init__.py | 18 ++ python/pyspark/sql/protobuf/functions.py | 215 +++++++++++++++++++++ 8 files changed, 296 insertions(+), 32 deletions(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index f6f5f026537..64dbe30012c 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -153,7 +153,7 @@ jobs: streaming, sql-kafka-0-10, streaming-kafka-0-10, mllib-local, mllib, yarn, mesos, kubernetes, hadoop-cloud, spark-ganglia-lgpl, - connect + connect, protobuf # Here, we split Hive and SQL tests into some of slow ones and the rest of them. included-tags: [""] excluded-tags: [""] diff --git a/connector/protobuf/src/test/resources/protobuf/pyspark_test.proto b/connector/protobuf/src/test/resources/protobuf/pyspark_test.proto new file mode 100644 index 00000000000..8750371349a --- /dev/null +++ b/connector/protobuf/src/test/resources/protobuf/pyspark_test.proto @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +// TODO(SPARK-40777): Instead of saving .desc files in resources, generate during build. +// To compile and create test class: +// protoc --java_out=connector/protobuf/src/test/resources/protobuf/ connector/protobuf/src/test/resources/protobuf/pyspark_test.proto +// protoc --descriptor_set_out=connector/protobuf/src/test/resources/protobuf/pyspark_test.desc --java_out=connector/protobuf/src/test/resources/protobuf/org/apache/spark/sql/protobuf/ connector/protobuf/src/test/resources/protobuf/pyspark_test.proto + +syntax = "proto3"; + +package org.apache.spark.sql.protobuf; +option java_outer_classname = "SimpleMessageProtos"; + + +message SimpleMessage { + int32 age = 1; + string name = 2; + int64 score = 3; +} \ No newline at end of file diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index e4a515d203c..2a427139148 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -282,6 +282,17 @@ connect = Module( ], ) +protobuf = Module( + name="protobuf", + dependencies=[sql], + source_file_regexes=[ + "connector/protobuf", + ], + sbt_test_goals=[ + "protobuf/test", + ], +) + sketch = Module( name="sketch", dependencies=[tags], @@ -423,7 +434,7 @@ pyspark_core = Module( pyspark_sql = Module( name="pyspark-sql", - dependencies=[pyspark_core, hive, avro], + dependencies=[pyspark_core, hive, avro, protobuf], source_file_regexes=["python/pyspark/sql"], python_test_goals=[ # doctests @@ -443,6 +454,7 @@ pyspark_sql = Module( "pyspark.sql.udf", "pyspark.sql.window", "pyspark.sql.avro.functions", + "pyspark.sql.protobuf.functions", "pyspark.sql.pandas.conversion", "pyspark.sql.pandas.map_ops", "pyspark.sql.pandas.group_ops", diff --git a/dev/sparktestsupport/utils.py b/dev/sparktestsupport/utils.py index 11d64c4f0bc..37e023aaa63 100755 --- a/dev/sparktestsupport/utils.py +++ b/dev/sparktestsupport/utils.py @@ -108,23 +108,23 @@ def determine_modules_to_test(changed_modules, deduplicated=True): ['graphx', 'examples'] >>> [x.name for x in determine_modules_to_test([modules.sql])] ... # doctest: +NORMALIZE_WHITESPACE - ['sql', 'avro', 'connect', 'docker-integration-tests', 'hive', 'mllib', 'sql-kafka-0-10', - 'examples', 'hive-thriftserver', 'pyspark-sql', 'repl', 'sparkr', 'pyspark-connect', - 'pyspark-mllib', 'pyspark-pandas', 'pyspark-pandas-slow', 'pyspark-ml'] + ['sql', 'avro', 'connect', 'docker-integration-tests', 'hive', 'mllib', 'protobuf', + 'sql-kafka-0-10', 'examples', 'hive-thriftserver', 'pyspark-sql', 'repl', 'sparkr', + 'pyspark-connect', 'pyspark-mllib', 'pyspark-pandas', 'pyspark-pandas-slow', 'pyspark-ml'] >>> sorted([x.name for x in determine_modules_to_test( ... [modules.sparkr, modules.sql], deduplicated=False)]) ... # doctest: +NORMALIZE_WHITESPACE ['avro', 'connect', 'docker-integration-tests', 'examples', 'hive', 'hive-thriftserver', - 'mllib', 'pyspark-connect', 'pyspark-ml', 'pyspark-mllib', 'pyspark-pandas', + 'mllib', 'protobuf', 'pyspark-connect', 'pyspark-ml', 'pyspark-mllib', 'pyspark-pandas', 'pyspark-pandas-slow', 'pyspark-sql', 'repl', 'sparkr', 'sql', 'sql-kafka-0-10'] >>> sorted([x.name for x in determine_modules_to_test( ... [modules.sql, modules.core], deduplicated=False)]) ... # doctest: +NORMALIZE_WHITESPACE ['avro', 'catalyst', 'connect', 'core', 'docker-integration-tests', 'examples', 'graphx', - 'hive', 'hive-thriftserver', 'mllib', 'mllib-local', 'pyspark-connect', 'pyspark-core', - 'pyspark-ml', 'pyspark-mllib', 'pyspark-pandas', 'pyspark-pandas-slow', 'pyspark-resource', - 'pyspark-sql', 'pyspark-streaming', 'repl', 'root', 'sparkr', 'sql', 'sql-kafka-0-10', - 'streaming', 'streaming-kafka-0-10', 'streaming-kinesis-asl'] + 'hive', 'hive-thriftserver', 'mllib', 'mllib-local', 'protobuf', 'pyspark-connect', + 'pyspark-core', 'pyspark-ml', 'pyspark-mllib', 'pyspark-pandas', 'pyspark-pandas-slow', + 'pyspark-resource', 'pyspark-sql', 'pyspark-streaming', 'repl', 'root', 'sparkr', 'sql', + 'sql-kafka-0-10', 'streaming', 'streaming-kafka-0-10', 'streaming-kinesis-asl'] """ modules_to_test = set() for module in changed_modules: diff --git a/python/docs/source/reference/pyspark.sql/index.rst b/python/docs/source/reference/pyspark.sql/index.rst index 52aca086cb4..fc4569486a7 100644 --- a/python/docs/source/reference/pyspark.sql/index.rst +++ b/python/docs/source/reference/pyspark.sql/index.rst @@ -40,3 +40,4 @@ This page gives an overview of all public Spark SQL API. avro observation udf + protobuf diff --git a/python/docs/source/reference/pyspark.sql/index.rst b/python/docs/source/reference/pyspark.sql/protobuf.rst similarity index 72% copy from python/docs/source/reference/pyspark.sql/index.rst copy to python/docs/source/reference/pyspark.sql/protobuf.rst index 52aca086cb4..0ba3d56c4c3 100644 --- a/python/docs/source/reference/pyspark.sql/index.rst +++ b/python/docs/source/reference/pyspark.sql/protobuf.rst @@ -16,27 +16,13 @@ under the License. -========= -Spark SQL -========= +======== +Protobuf +======== +.. currentmodule:: pyspark.sql.protobuf.functions -This page gives an overview of all public Spark SQL API. +.. autosummary:: + :toctree: api/ -.. toctree:: - :maxdepth: 2 - - core_classes - spark_session - configuration - io - dataframe - column - data_types - row - functions - window - grouping - catalog - avro - observation - udf + from_protobuf + to_protobuf diff --git a/python/pyspark/sql/protobuf/__init__.py b/python/pyspark/sql/protobuf/__init__.py new file mode 100644 index 00000000000..ac530a5495f --- /dev/null +++ b/python/pyspark/sql/protobuf/__init__.py @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +__all__ = ["functions"] diff --git a/python/pyspark/sql/protobuf/functions.py b/python/pyspark/sql/protobuf/functions.py new file mode 100644 index 00000000000..9f8b90095df --- /dev/null +++ b/python/pyspark/sql/protobuf/functions.py @@ -0,0 +1,215 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +A collections of builtin protobuf functions +""" + + +from typing import Dict, Optional, TYPE_CHECKING +from pyspark import SparkContext +from pyspark.sql.column import Column, _to_java_column +from pyspark.util import _print_missing_jar + +if TYPE_CHECKING: + from pyspark.sql._typing import ColumnOrName + + +def from_protobuf( + data: "ColumnOrName", + descFilePath: str, + messageName: str, + options: Optional[Dict[str, str]] = None, +) -> Column: + """ + Converts a binary column of Protobuf format into its corresponding catalyst value. + The specified schema must match the read data, otherwise the behavior is undefined: + it may fail or return arbitrary result. + To deserialize the data with a compatible and evolved schema, the expected + Protobuf schema can be set via the option protobuf descriptor. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + data : :class:`~pyspark.sql.Column` or str + the binary column. + descFilePath : str + the protobuf descriptor in Message GeneratedMessageV3 format. + messageName: str + the protobuf message name to look for in descriptor file. + options : dict, optional + options to control how the protobuf record is parsed. + + Notes + ----- + Protobuf functionality is provided as an pluggable external module. + + Examples + -------- + >>> import tempfile + >>> data = [("1", (2, "Alice", 109200))] + >>> ddl_schema = "key STRING, value STRUCT<age: INTEGER, name: STRING, score: LONG>" + >>> df = spark.createDataFrame(data, ddl_schema) + >>> desc_hex = str('0ACE010A41636F6E6E6563746F722F70726F746F6275662F7372632F746573742F726' + ... '5736F75726365732F70726F746F6275662F7079737061726B5F746573742E70726F746F121D6F72672E61' + ... '70616368652E737061726B2E73716C2E70726F746F627566224B0A0D53696D706C654D657373616765121' + ... '00A03616765180120012805520361676512120A046E616D6518022001280952046E616D6512140A057363' + ... '6F7265180320012803520573636F72654215421353696D706C654D65737361676550726F746F736206707' + ... '26F746F33') + >>> # Writing a protobuf description into a file, generated by using + >>> # connector/protobuf/src/test/resources/protobuf/pyspark_test.proto file + >>> with tempfile.TemporaryDirectory() as tmp_dir: + ... desc_file_path = "%s/pyspark_test.desc" % tmp_dir + ... with open(desc_file_path, "wb") as f: + ... _ = f.write(bytearray.fromhex(desc_hex)) + ... f.flush() + ... message_name = 'SimpleMessage' + ... proto_df = df.select( + ... to_protobuf(df.value, desc_file_path, message_name).alias("value")) + ... proto_df.show(truncate=False) + ... proto_df = proto_df.select( + ... from_protobuf(proto_df.value, desc_file_path, message_name).alias("value")) + ... proto_df.show(truncate=False) + +----------------------------------------+ + |value | + +----------------------------------------+ + |[08 02 12 05 41 6C 69 63 65 18 90 D5 06]| + +----------------------------------------+ + +------------------+ + |value | + +------------------+ + |{2, Alice, 109200}| + +------------------+ + """ + + sc = SparkContext._active_spark_context + assert sc is not None and sc._jvm is not None + try: + jc = sc._jvm.org.apache.spark.sql.protobuf.functions.from_protobuf( + _to_java_column(data), descFilePath, messageName, options or {} + ) + except TypeError as e: + if str(e) == "'JavaPackage' object is not callable": + _print_missing_jar("Protobuf", "protobuf", "protobuf", sc.version) + raise + return Column(jc) + + +def to_protobuf(data: "ColumnOrName", descFilePath: str, messageName: str) -> Column: + """ + Converts a column into binary of protobuf format. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + data : :class:`~pyspark.sql.Column` or str + the data column. + descFilePath : str + the protobuf descriptor in Message GeneratedMessageV3 format. + messageName: str + the protobuf message name to look for in descriptor file. + + Notes + ----- + Protobuf functionality is provided as an pluggable external module + + Examples + -------- + >>> import tempfile + >>> data = [([(2, "Alice", 13093020)])] + >>> ddl_schema = "value struct<age: INTEGER, name: STRING, score: LONG>" + >>> df = spark.createDataFrame(data, ddl_schema) + >>> desc_hex = str('0ACE010A41636F6E6E6563746F722F70726F746F6275662F7372632F746573742F726' + ... '5736F75726365732F70726F746F6275662F7079737061726B5F746573742E70726F746F121D6F72672E61' + ... '70616368652E737061726B2E73716C2E70726F746F627566224B0A0D53696D706C654D657373616765121' + ... '00A03616765180120012805520361676512120A046E616D6518022001280952046E616D6512140A057363' + ... '6F7265180320012803520573636F72654215421353696D706C654D65737361676550726F746F736206707' + ... '26F746F33') + >>> # Writing a protobuf description into a file, generated by using + >>> # connector/protobuf/src/test/resources/protobuf/pyspark_test.proto file + >>> with tempfile.TemporaryDirectory() as tmp_dir: + ... desc_file_path = "%s/pyspark_test.desc" % tmp_dir + ... with open(desc_file_path, "wb") as f: + ... _ = f.write(bytearray.fromhex(desc_hex)) + ... f.flush() + ... message_name = 'SimpleMessage' + ... proto_df = df.select( + ... to_protobuf(df.value, desc_file_path, message_name).alias("suite")) + ... proto_df.show(truncate=False) + +-------------------------------------------+ + |suite | + +-------------------------------------------+ + |[08 02 12 05 41 6C 69 63 65 18 9C 91 9F 06]| + +-------------------------------------------+ + """ + sc = SparkContext._active_spark_context + assert sc is not None and sc._jvm is not None + try: + jc = sc._jvm.org.apache.spark.sql.protobuf.functions.to_protobuf( + _to_java_column(data), descFilePath, messageName + ) + except TypeError as e: + if str(e) == "'JavaPackage' object is not callable": + _print_missing_jar("Protobuf", "protobuf", "protobuf", sc.version) + raise + return Column(jc) + + +def _test() -> None: + import os + import sys + from pyspark.testing.utils import search_jar + + protobuf_jar = search_jar("connector/protobuf", "spark-protobuf-assembly-", "spark-protobuf") + if protobuf_jar is None: + print( + "Skipping all Protobuf Python tests as the optional Protobuf project was " + "not compiled into a JAR. To run these tests, " + "you need to build Spark with 'build/sbt package' or " + "'build/mvn package' before running this test." + ) + sys.exit(0) + else: + existing_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell") + jars_args = "--jars %s" % protobuf_jar + os.environ["PYSPARK_SUBMIT_ARGS"] = " ".join([jars_args, existing_args]) + + import doctest + from pyspark.sql import SparkSession + import pyspark.sql.protobuf.functions + + globs = pyspark.sql.protobuf.functions.__dict__.copy() + spark = ( + SparkSession.builder.master("local[2]") + .appName("sql.protobuf.functions tests") + .getOrCreate() + ) + globs["spark"] = spark + (failure_count, test_count) = doctest.testmod( + pyspark.sql.protobuf.functions, + globs=globs, + optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE, + ) + spark.stop() + if failure_count: + sys.exit(-1) + + +if __name__ == "__main__": + _test() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org