This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f7994217d5a [SPARK-46736][PROTOBUF] retain empty message field in 
protobuf connector
3f7994217d5a is described below

commit 3f7994217d5a8d2816165459c1ce10d9b31bc7fd
Author: Chaoqin Li <chaoqin...@databricks.com>
AuthorDate: Tue Jan 30 11:02:35 2024 +0900

    [SPARK-46736][PROTOBUF] retain empty message field in protobuf connector
    
    ### What changes were proposed in this pull request?
    Since Spark doesn't allow empty StructType, empty proto message type as 
field will be dropped by default. introduce an option to allow retaining an 
empty message field by inserting a dummy column.
    
    ### Why are the changes needed?
    In protobuf, it is common to have empty message type without any field as a 
place holder, in some case people may not want to drop these empty message 
field.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. The default behavior is still dropping an empty message field. The new 
option will enable customer to keep the empty message field though they will 
observe a dummy column.
    
    ### How was this patch tested?
    Unit test and integration test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #44643 from chaoqin-li1123/empty_proto.
    
    Authored-by: Chaoqin Li <chaoqin...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../spark/sql/protobuf/utils/ProtobufOptions.scala |  17 +++
 .../sql/protobuf/utils/SchemaConverters.scala      |  34 ++++--
 .../test/resources/protobuf/functions_suite.proto  |   9 ++
 .../sql/protobuf/ProtobufFunctionsSuite.scala      | 123 ++++++++++++++++++++-
 4 files changed, 171 insertions(+), 12 deletions(-)

diff --git 
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala
 
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala
index 5f8c42df365a..6644bce98293 100644
--- 
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala
+++ 
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufOptions.scala
@@ -207,6 +207,23 @@ private[sql] class ProtobufOptions(
   //    nil => nil, Int32Value(0) => 0, Int32Value(100) => 100.
   val unwrapWellKnownTypes: Boolean =
     parameters.getOrElse("unwrap.primitive.wrapper.types", 
false.toString).toBoolean
+
+  // Since Spark doesn't allow writing empty StructType, empty proto message 
type will be
+  // dropped by default. Setting this option to true will insert a dummy 
column to empty proto
+  // message so that the empty message will be retained.
+  // For example, an empty message is used as field in another message:
+  //
+  // ```
+  // message A {}
+  // message B {A a = 1, string name = 2}
+  // ```
+  //
+  // By default, in the spark schema field a will be dropped, which result in 
schema
+  // b struct<name: string>
+  // If retain.empty.message.types=true, field a will be retained by inserting 
a dummy column.
+  // b struct<a struct<__dummy_field_in_empty_struct: string>, name: string>
+  val retainEmptyMessage: Boolean =
+    parameters.getOrElse("retain.empty.message.types", 
false.toString).toBoolean
 }
 
 private[sql] object ProtobufOptions {
diff --git 
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala
 
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala
index b35aa153aaa1..feb5aed03451 100644
--- 
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala
+++ 
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala
@@ -51,12 +51,13 @@ object SchemaConverters extends Logging {
   def toSqlTypeHelper(
       descriptor: Descriptor,
       protobufOptions: ProtobufOptions): SchemaType = {
-    SchemaType(
-      StructType(descriptor.getFields.asScala.flatMap(
-        structFieldFor(_,
-          Map(descriptor.getFullName -> 1),
-          protobufOptions: ProtobufOptions)).toArray),
-      nullable = true)
+    val fields = descriptor.getFields.asScala.flatMap(
+      structFieldFor(_,
+        Map(descriptor.getFullName -> 1),
+        protobufOptions: ProtobufOptions)).toSeq
+    if (fields.isEmpty && protobufOptions.retainEmptyMessage) {
+      
SchemaType(convertEmptyProtoToStructWithDummyField(descriptor.getFullName), 
nullable = true)
+    } else SchemaType(StructType(fields), nullable = true)
   }
 
   // existingRecordNames: Map[String, Int] used to track the depth of 
recursive fields and to
@@ -212,11 +213,15 @@ object SchemaConverters extends Logging {
           ).toSeq
           fields match {
             case Nil =>
-              log.info(
-                s"Dropping ${fd.getFullName} as it does not have any fields 
left " +
-                "likely due to recursive depth limit."
-              )
-              None
+              if (protobufOptions.retainEmptyMessage) {
+                Some(convertEmptyProtoToStructWithDummyField(fd.getFullName))
+              } else {
+                log.info(
+                  s"Dropping ${fd.getFullName} as it does not have any fields 
left " +
+                    "likely due to recursive depth limit."
+                )
+                None
+              }
             case fds => Some(StructType(fds))
           }
         }
@@ -230,4 +235,11 @@ object SchemaConverters extends Logging {
       case dt => StructField(fd.getName, dt, nullable = !fd.isRequired)
     }
   }
+
+  // Insert a dummy column to retain the empty message because
+  // spark doesn't allow empty struct type.
+  private def convertEmptyProtoToStructWithDummyField(desc: String): 
StructType = {
+    log.info(s"Keep $desc which is empty struct by inserting a dummy field.")
+    StructType(StructField("__dummy_field_in_empty_struct", StringType) :: Nil)
+  }
 }
diff --git 
a/connector/protobuf/src/test/resources/protobuf/functions_suite.proto 
b/connector/protobuf/src/test/resources/protobuf/functions_suite.proto
index a643e91158eb..4cae7f9abf28 100644
--- a/connector/protobuf/src/test/resources/protobuf/functions_suite.proto
+++ b/connector/protobuf/src/test/resources/protobuf/functions_suite.proto
@@ -269,6 +269,15 @@ message EventRecursiveB {
   OneOfEventWithRecursion recursiveOneOffInB = 3;
 }
 
+message EmptyProto {
+  // This is an empty proto
+}
+
+message EmptyProtoWrapper {
+  string name = 1;
+  EmptyProto empty_proto = 2;
+}
+
 message EmptyRecursiveProto {
   // This is a recursive proto with no fields. Used to test edge. Catalyst 
schema for this
   // should be "nothing" (i.e. completely dropped) irrespective of recursive 
limit.
diff --git 
a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala
 
b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala
index 5e9e737151fe..fb8a68f1812b 100644
--- 
a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala
+++ 
b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala
@@ -1124,7 +1124,78 @@ class ProtobufFunctionsSuite extends QueryTest with 
SharedSparkSession with Prot
     }
   }
 
-  test("Corner case: empty recursive proto fields should be dropped") {
+  test("Retain empty proto fields when retain.empty.message.types=true") {
+    // When retain.empty.message.types=true, empty proto like 'message A {}' 
can be retained as
+    // a field by inserting a dummy column as sub column.
+    val options = Map("retain.empty.message.types" -> "true")
+
+    // EmptyProto at the top level. It will be an empty struct.
+    checkWithFileAndClassName("EmptyProto") {
+      case (name, descFilePathOpt) =>
+        val df = emptyBinaryDF.select(
+          from_protobuf_wrapper($"binary", name, descFilePathOpt, 
options).as("empty_proto")
+        )
+        // Top level empty message is retained by adding dummy column to the 
schema.
+        assert(df.schema ==
+          structFromDDL("empty_proto struct<__dummy_field_in_empty_struct: 
string>"))
+    }
+
+    // Inner level empty message is retained by adding dummy column to the 
schema.
+    checkWithFileAndClassName("EmptyProtoWrapper") {
+      case (name, descFilePathOpt) =>
+        val df = emptyBinaryDF.select(
+          from_protobuf_wrapper($"binary", name, descFilePathOpt, 
options).as("wrapper")
+        )
+        // Nested empty message is retained by adding dummy column to the 
schema.
+        assert(df.schema == structFromDDL("wrapper struct" +
+          "<name: string, empty_proto struct<__dummy_field_in_empty_struct: 
string>>"))
+    }
+  }
+
+  test("Write empty proto to parquet when retain.empty.message.types=true") {
+    // When retain.empty.message.types=true, empty proto like 'message A {}' 
can be retained
+    // as a field by inserting a dummy column as sub column, such schema can 
be written to parquet.
+    val options = Map("retain.empty.message.types" -> "true")
+    withTempDir { file =>
+      val binaryDF = Seq(
+        EmptyProtoWrapper.newBuilder.setName("my_name").build().toByteArray)
+        .toDF("binary")
+      checkWithFileAndClassName("EmptyProtoWrapper") {
+        case (name, descFilePathOpt) =>
+          val df = binaryDF.select(
+            from_protobuf_wrapper($"binary", name, descFilePathOpt, 
options).as("wrapper")
+          )
+          
df.write.format("parquet").mode("overwrite").save(file.getAbsolutePath)
+      }
+      val resultDF = spark.read.format("parquet").load(file.getAbsolutePath)
+      assert(resultDF.schema == structFromDDL("wrapper struct" +
+          "<name: string, empty_proto struct<__dummy_field_in_empty_struct: 
string>>"
+      ))
+      // The dummy column of empty proto should have null value.
+      checkAnswer(resultDF, Seq(Row(Row("my_name", null))))
+    }
+
+    // When top level message is empty, write to parquet.
+    withTempDir { file =>
+      val binaryDF = Seq(
+        EmptyProto.newBuilder.build().toByteArray)
+        .toDF("binary")
+      checkWithFileAndClassName("EmptyProto") {
+        case (name, descFilePathOpt) =>
+          val df = binaryDF.select(
+            from_protobuf_wrapper($"binary", name, descFilePathOpt, 
options).as("empty_proto")
+          )
+          
df.write.format("parquet").mode("overwrite").save(file.getAbsolutePath)
+      }
+      val resultDF = spark.read.format("parquet").load(file.getAbsolutePath)
+      assert(resultDF.schema ==
+        structFromDDL("empty_proto struct<__dummy_field_in_empty_struct: 
string>"))
+      // The dummy column of empty proto should have null value.
+      checkAnswer(resultDF, Seq(Row(Row(null))))
+    }
+  }
+
+  test("Corner case: empty recursive proto fields should be dropped by 
default") {
     // This verifies that a empty proto like 'message A { A a = 1}' are 
completely dropped
     // irrespective of max depth setting.
 
@@ -1150,6 +1221,56 @@ class ProtobufFunctionsSuite extends QueryTest with 
SharedSparkSession with Prot
     }
   }
 
+  test("Retain empty recursive proto fields when 
retain.empty.message.types=true") {
+    // This verifies that a empty proto like 'message A { A a = 1}' can be 
retained by
+    // inserting a dummy field.
+
+    val structWithDummyColumn =
+      StructType(StructField("__dummy_field_in_empty_struct", StringType) :: 
Nil)
+    val structWithRecursiveDepthEquals2 = StructType(
+      StructField("recursive_field", structWithDummyColumn)
+        :: StructField("recursive_array", ArrayType(structWithDummyColumn, 
containsNull = false))
+        :: Nil)
+    /*
+      The code below construct the expected schema with recursive depth set to 
3.
+      Note: If recursive depth change, the resulting schema of empty recursive 
proto will change.
+        root
+         |-- empty_proto: struct (nullable = true)
+         |    |-- recursive_field: struct (nullable = true)
+         |    |    |-- recursive_field: struct (nullable = true)
+         |    |    |    |-- __dummy_field_in_empty_struct: string (nullable = 
true)
+         |    |    |-- recursive_array: array (nullable = true)
+         |    |    |    |-- element: struct (containsNull = false)
+         |    |    |    |    |-- __dummy_field_in_empty_struct: string 
(nullable = true)
+         |    |-- recursive_array: array (nullable = true)
+         |    |    |-- element: struct (containsNull = false)
+         |    |    |    |-- recursive_field: struct (nullable = true)
+         |    |    |    |    |-- __dummy_field_in_empty_struct: string 
(nullable = true)
+         |    |    |    |-- recursive_array: array (nullable = true)
+         |    |    |    |    |-- element: struct (containsNull = false)
+         |    |    |    |    |    |-- __dummy_field_in_empty_struct: string 
(nullable = true)
+    */
+    val structWithRecursiveDepthEquals3 = StructType(
+      StructField("empty_proto",
+        StructType(
+          StructField("recursive_field", structWithRecursiveDepthEquals2) ::
+            StructField("recursive_array",
+              ArrayType(structWithRecursiveDepthEquals2, containsNull = false)
+          ) :: Nil
+        )
+      ) :: Nil
+    )
+
+    val options = Map("recursive.fields.max.depth" -> "3", 
"retain.empty.message.types" -> "true")
+    checkWithFileAndClassName("EmptyRecursiveProto") {
+      case (name, descFilePathOpt) =>
+        val df = emptyBinaryDF.select(
+          from_protobuf_wrapper($"binary", name, descFilePathOpt, 
options).as("empty_proto")
+        )
+        assert(df.schema == structWithRecursiveDepthEquals3)
+    }
+  }
+
   test("Converting Any fields to JSON") {
     // Verifies schema and deserialization when 'convert.any.fields.to.json' 
is set.
     checkWithFileAndClassName("ProtoWithAny") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to