This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ff9d41abaff [SPARK-43799][PYTHON] Add descriptor binary option to Pyspark Protobuf API ff9d41abaff is described below commit ff9d41abaffcbd6f0c26ce5be9d2324fe9f01d5c Author: Raghu Angadi <raghu.ang...@databricks.com> AuthorDate: Tue May 30 09:01:32 2023 +0900 [SPARK-43799][PYTHON] Add descriptor binary option to Pyspark Protobuf API ### What changes were proposed in this pull request? This updated Protobuf Pyspark API to allow passing binary FileDescriptorSet rather than a file name. This is a Python follow up to feature implemented in Scala in #41192. ### Why are the changes needed? - This allows flexibility for Pyspark users to provide binary descriptor set directly. - Even if users are using file path, Pyspark avoids passing file name to Scala and reads the descriptor file in Python. This avoids having to read the file in Scala. ### Does this PR introduce _any_ user-facing change? - This adds extra arg to `from_protobuf()` and `to_protobuf()` API. ### How was this patch tested? - Doc tests - Manual tests Closes #41343 from rangadi/py-proto-file-buffer. Authored-by: Raghu Angadi <raghu.ang...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- python/pyspark/sql/protobuf/functions.py | 74 +++++++++++++++++++++++++++----- 1 file changed, 64 insertions(+), 10 deletions(-) diff --git a/python/pyspark/sql/protobuf/functions.py b/python/pyspark/sql/protobuf/functions.py index a303cf91493..42165938eb7 100644 --- a/python/pyspark/sql/protobuf/functions.py +++ b/python/pyspark/sql/protobuf/functions.py @@ -37,13 +37,17 @@ def from_protobuf( messageName: str, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None, + binaryDescriptorSet: Optional[bytes] = None, ) -> Column: """ Converts a binary column of Protobuf format into its corresponding catalyst value. - The Protobuf definition is provided in one of these two ways: + The Protobuf definition is provided in one of these ways: - Protobuf descriptor file: E.g. a descriptor file created with `protoc --include_imports --descriptor_set_out=abc.desc abc.proto` + - Protobuf descriptor as binary: Rather than file path as in previous option, + we can provide the binary content of the file. This allows flexibility in how the + descriptor set is created and fetched. - Jar containing Protobuf Java class: The jar containing Java class should be shaded. Specifically, `com.google.protobuf.*` should be shaded to `org.sparkproject.spark_protobuf.protobuf.*`. @@ -52,6 +56,9 @@ def from_protobuf( .. versionadded:: 3.4.0 + .. versionchanged:: 3.5.0 + Supports `binaryDescriptorSet` arg to pass binary descriptor directly. + Parameters ---------- data : :class:`~pyspark.sql.Column` or str @@ -61,9 +68,11 @@ def from_protobuf( The Protobuf class name when descFilePath parameter is not set. E.g. `com.example.protos.ExampleEvent`. descFilePath : str, optional - The protobuf descriptor file. + The Protobuf descriptor file. options : dict, optional options to control how the protobuf record is parsed. + binaryDescriptorSet: bytes, optional + The Protobuf `FileDescriptorSet` serialized as binary. Notes ----- @@ -92,9 +101,14 @@ def from_protobuf( ... proto_df = df.select( ... to_protobuf(df.value, message_name, desc_file_path).alias("value")) ... proto_df.show(truncate=False) - ... proto_df = proto_df.select( + ... proto_df_1 = proto_df.select( # With file name for descriptor ... from_protobuf(proto_df.value, message_name, desc_file_path).alias("value")) - ... proto_df.show(truncate=False) + ... proto_df_1.show(truncate=False) + ... proto_df_2 = proto_df.select( # With binary for descriptor + ... from_protobuf(proto_df.value, message_name, + ... binaryDescriptorSet = bytearray.fromhex(desc_hex)) + ... .alias("value")) + ... proto_df_2.show(truncate=False) +----------------------------------------+ |value | +----------------------------------------+ @@ -105,6 +119,11 @@ def from_protobuf( +------------------+ |{2, Alice, 109200}| +------------------+ + +------------------+ + |value | + +------------------+ + |{2, Alice, 109200}| + +------------------+ >>> data = [([(1668035962, 2020)])] >>> ddl_schema = "value struct<seconds: LONG, nanos: INT>" >>> df = spark.createDataFrame(data, ddl_schema) @@ -122,9 +141,14 @@ def from_protobuf( sc = get_active_spark_context() try: - if descFilePath is not None: + binary_proto = None + if binaryDescriptorSet is not None: + binary_proto = binaryDescriptorSet + elif descFilePath is not None: + binary_proto = _read_descriptor_set_file(descFilePath) + if binary_proto is not None: jc = cast(JVMView, sc._jvm).org.apache.spark.sql.protobuf.functions.from_protobuf( - _to_java_column(data), messageName, descFilePath, options or {} + _to_java_column(data), messageName, binary_proto, options or {} ) else: jc = cast(JVMView, sc._jvm).org.apache.spark.sql.protobuf.functions.from_protobuf( @@ -142,13 +166,17 @@ def to_protobuf( messageName: str, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None, + binaryDescriptorSet: Optional[bytes] = None, ) -> Column: """ Converts a column into binary of protobuf format. The Protobuf definition is provided in one - of these two ways: + of these ways: - Protobuf descriptor file: E.g. a descriptor file created with `protoc --include_imports --descriptor_set_out=abc.desc abc.proto` + - Protobuf descriptor as binary: Rather than file path as in previous option, + we can provide the binary content of the file. This allows flexibility in how the + descriptor set is created and fetched. - Jar containing Protobuf Java class: The jar containing Java class should be shaded. Specifically, `com.google.protobuf.*` should be shaded to `org.sparkproject.spark_protobuf.protobuf.*`. @@ -157,6 +185,9 @@ def to_protobuf( .. versionadded:: 3.4.0 + .. versionchanged:: 3.5.0 + Supports `binaryDescriptorSet` arg to pass binary descriptor directly. + Parameters ---------- data : :class:`~pyspark.sql.Column` or str @@ -168,6 +199,8 @@ def to_protobuf( descFilePath : str, optional the Protobuf descriptor file. options : dict, optional + binaryDescriptorSet: bytes, optional + The Protobuf `FileDescriptorSet` serialized as binary. Notes ----- @@ -193,9 +226,19 @@ def to_protobuf( ... _ = f.write(bytearray.fromhex(desc_hex)) ... f.flush() ... message_name = 'SimpleMessage' - ... proto_df = df.select( + ... proto_df = df.select( # With file name for descriptor ... to_protobuf(df.value, message_name, desc_file_path).alias("suite")) ... proto_df.show(truncate=False) + ... proto_df_2 = df.select( # With binary for descriptor + ... to_protobuf(df.value, message_name, + ... binaryDescriptorSet=bytearray.fromhex(desc_hex)) + ... .alias("suite")) + ... proto_df_2.show(truncate=False) + +-------------------------------------------+ + |suite | + +-------------------------------------------+ + |[08 02 12 05 41 6C 69 63 65 18 9C 91 9F 06]| + +-------------------------------------------+ +-------------------------------------------+ |suite | +-------------------------------------------+ @@ -216,9 +259,14 @@ def to_protobuf( sc = get_active_spark_context() try: - if descFilePath is not None: + binary_proto = None + if binaryDescriptorSet is not None: + binary_proto = binaryDescriptorSet + elif descFilePath is not None: + binary_proto = _read_descriptor_set_file(descFilePath) + if binary_proto is not None: jc = cast(JVMView, sc._jvm).org.apache.spark.sql.protobuf.functions.to_protobuf( - _to_java_column(data), messageName, descFilePath, options or {} + _to_java_column(data), messageName, binary_proto, options or {} ) else: jc = cast(JVMView, sc._jvm).org.apache.spark.sql.protobuf.functions.to_protobuf( @@ -232,6 +280,12 @@ def to_protobuf( return Column(jc) +def _read_descriptor_set_file(filePath: str) -> bytes: + # TODO(SPARK-43847): Throw structured errors like "PROTOBUF_DESCRIPTOR_FILE_NOT_FOUND" etc. + with open(filePath, "rb") as f: + return f.read() + + def _test() -> None: import os import sys --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org