This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ff9d41abaff [SPARK-43799][PYTHON] Add descriptor binary option to 
Pyspark Protobuf API
ff9d41abaff is described below

commit ff9d41abaffcbd6f0c26ce5be9d2324fe9f01d5c
Author: Raghu Angadi <raghu.ang...@databricks.com>
AuthorDate: Tue May 30 09:01:32 2023 +0900

    [SPARK-43799][PYTHON] Add descriptor binary option to Pyspark Protobuf API
    
    ### What changes were proposed in this pull request?
    This updated Protobuf Pyspark API to allow passing binary FileDescriptorSet 
rather than a file name. This is a Python follow up to feature implemented in 
Scala in #41192.
    
    ### Why are the changes needed?
      - This allows flexibility for Pyspark users to provide binary descriptor 
set directly.
      - Even if users are using file path, Pyspark avoids passing file name to 
Scala and reads the descriptor file in Python. This avoids having to read the 
file in Scala.
    
    ### Does this PR introduce _any_ user-facing change?
    
      - This adds extra arg to `from_protobuf()` and `to_protobuf()` API.
    
    ### How was this patch tested?
      - Doc tests
      - Manual tests
    
    Closes #41343 from rangadi/py-proto-file-buffer.
    
    Authored-by: Raghu Angadi <raghu.ang...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 python/pyspark/sql/protobuf/functions.py | 74 +++++++++++++++++++++++++++-----
 1 file changed, 64 insertions(+), 10 deletions(-)

diff --git a/python/pyspark/sql/protobuf/functions.py 
b/python/pyspark/sql/protobuf/functions.py
index a303cf91493..42165938eb7 100644
--- a/python/pyspark/sql/protobuf/functions.py
+++ b/python/pyspark/sql/protobuf/functions.py
@@ -37,13 +37,17 @@ def from_protobuf(
     messageName: str,
     descFilePath: Optional[str] = None,
     options: Optional[Dict[str, str]] = None,
+    binaryDescriptorSet: Optional[bytes] = None,
 ) -> Column:
     """
     Converts a binary column of Protobuf format into its corresponding 
catalyst value.
-    The Protobuf definition is provided in one of these two ways:
+    The Protobuf definition is provided in one of these ways:
 
        - Protobuf descriptor file: E.g. a descriptor file created with
           `protoc --include_imports --descriptor_set_out=abc.desc abc.proto`
+       - Protobuf descriptor as binary: Rather than file path as in previous 
option,
+         we can provide the binary content of the file. This allows 
flexibility in how the
+         descriptor set is created and fetched.
        - Jar containing Protobuf Java class: The jar containing Java class 
should be shaded.
          Specifically, `com.google.protobuf.*` should be shaded to
          `org.sparkproject.spark_protobuf.protobuf.*`.
@@ -52,6 +56,9 @@ def from_protobuf(
 
     .. versionadded:: 3.4.0
 
+    .. versionchanged:: 3.5.0
+        Supports `binaryDescriptorSet` arg to pass binary descriptor directly.
+
     Parameters
     ----------
     data : :class:`~pyspark.sql.Column` or str
@@ -61,9 +68,11 @@ def from_protobuf(
         The Protobuf class name when descFilePath parameter is not set.
         E.g. `com.example.protos.ExampleEvent`.
     descFilePath : str, optional
-        The protobuf descriptor file.
+        The Protobuf descriptor file.
     options : dict, optional
         options to control how the protobuf record is parsed.
+    binaryDescriptorSet: bytes, optional
+        The Protobuf `FileDescriptorSet` serialized as binary.
 
     Notes
     -----
@@ -92,9 +101,14 @@ def from_protobuf(
     ...         proto_df = df.select(
     ...             to_protobuf(df.value, message_name, 
desc_file_path).alias("value"))
     ...         proto_df.show(truncate=False)
-    ...         proto_df = proto_df.select(
+    ...         proto_df_1 = proto_df.select( # With file name for descriptor
     ...             from_protobuf(proto_df.value, message_name, 
desc_file_path).alias("value"))
-    ...         proto_df.show(truncate=False)
+    ...         proto_df_1.show(truncate=False)
+    ...         proto_df_2 = proto_df.select( # With binary for descriptor
+    ...             from_protobuf(proto_df.value, message_name,
+    ...                           binaryDescriptorSet = 
bytearray.fromhex(desc_hex))
+    ...             .alias("value"))
+    ...         proto_df_2.show(truncate=False)
     +----------------------------------------+
     |value                                   |
     +----------------------------------------+
@@ -105,6 +119,11 @@ def from_protobuf(
     +------------------+
     |{2, Alice, 109200}|
     +------------------+
+    +------------------+
+    |value             |
+    +------------------+
+    |{2, Alice, 109200}|
+    +------------------+
     >>> data = [([(1668035962, 2020)])]
     >>> ddl_schema = "value struct<seconds: LONG, nanos: INT>"
     >>> df = spark.createDataFrame(data, ddl_schema)
@@ -122,9 +141,14 @@ def from_protobuf(
 
     sc = get_active_spark_context()
     try:
-        if descFilePath is not None:
+        binary_proto = None
+        if binaryDescriptorSet is not None:
+            binary_proto = binaryDescriptorSet
+        elif descFilePath is not None:
+            binary_proto = _read_descriptor_set_file(descFilePath)
+        if binary_proto is not None:
             jc = cast(JVMView, 
sc._jvm).org.apache.spark.sql.protobuf.functions.from_protobuf(
-                _to_java_column(data), messageName, descFilePath, options or {}
+                _to_java_column(data), messageName, binary_proto, options or {}
             )
         else:
             jc = cast(JVMView, 
sc._jvm).org.apache.spark.sql.protobuf.functions.from_protobuf(
@@ -142,13 +166,17 @@ def to_protobuf(
     messageName: str,
     descFilePath: Optional[str] = None,
     options: Optional[Dict[str, str]] = None,
+    binaryDescriptorSet: Optional[bytes] = None,
 ) -> Column:
     """
     Converts a column into binary of protobuf format. The Protobuf definition 
is provided in one
-    of these two ways:
+    of these ways:
 
        - Protobuf descriptor file: E.g. a descriptor file created with
           `protoc --include_imports --descriptor_set_out=abc.desc abc.proto`
+       - Protobuf descriptor as binary: Rather than file path as in previous 
option,
+         we can provide the binary content of the file. This allows 
flexibility in how the
+         descriptor set is created and fetched.
        - Jar containing Protobuf Java class: The jar containing Java class 
should be shaded.
          Specifically, `com.google.protobuf.*` should be shaded to
          `org.sparkproject.spark_protobuf.protobuf.*`.
@@ -157,6 +185,9 @@ def to_protobuf(
 
     .. versionadded:: 3.4.0
 
+    .. versionchanged:: 3.5.0
+        Supports `binaryDescriptorSet` arg to pass binary descriptor directly.
+
     Parameters
     ----------
     data : :class:`~pyspark.sql.Column` or str
@@ -168,6 +199,8 @@ def to_protobuf(
     descFilePath : str, optional
         the Protobuf descriptor file.
     options : dict, optional
+    binaryDescriptorSet: bytes, optional
+        The Protobuf `FileDescriptorSet` serialized as binary.
 
     Notes
     -----
@@ -193,9 +226,19 @@ def to_protobuf(
     ...         _ = f.write(bytearray.fromhex(desc_hex))
     ...         f.flush()
     ...         message_name = 'SimpleMessage'
-    ...         proto_df = df.select(
+    ...         proto_df = df.select( # With file name for descriptor
     ...             to_protobuf(df.value, message_name, 
desc_file_path).alias("suite"))
     ...         proto_df.show(truncate=False)
+    ...         proto_df_2 = df.select( # With binary for descriptor
+    ...             to_protobuf(df.value, message_name,
+    ...                         
binaryDescriptorSet=bytearray.fromhex(desc_hex))
+    ...             .alias("suite"))
+    ...         proto_df_2.show(truncate=False)
+    +-------------------------------------------+
+    |suite                                      |
+    +-------------------------------------------+
+    |[08 02 12 05 41 6C 69 63 65 18 9C 91 9F 06]|
+    +-------------------------------------------+
     +-------------------------------------------+
     |suite                                      |
     +-------------------------------------------+
@@ -216,9 +259,14 @@ def to_protobuf(
 
     sc = get_active_spark_context()
     try:
-        if descFilePath is not None:
+        binary_proto = None
+        if binaryDescriptorSet is not None:
+            binary_proto = binaryDescriptorSet
+        elif descFilePath is not None:
+            binary_proto = _read_descriptor_set_file(descFilePath)
+        if binary_proto is not None:
             jc = cast(JVMView, 
sc._jvm).org.apache.spark.sql.protobuf.functions.to_protobuf(
-                _to_java_column(data), messageName, descFilePath, options or {}
+                _to_java_column(data), messageName, binary_proto, options or {}
             )
         else:
             jc = cast(JVMView, 
sc._jvm).org.apache.spark.sql.protobuf.functions.to_protobuf(
@@ -232,6 +280,12 @@ def to_protobuf(
     return Column(jc)
 
 
+def _read_descriptor_set_file(filePath: str) -> bytes:
+    # TODO(SPARK-43847): Throw structured errors like 
"PROTOBUF_DESCRIPTOR_FILE_NOT_FOUND" etc.
+    with open(filePath, "rb") as f:
+        return f.read()
+
+
 def _test() -> None:
     import os
     import sys


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to