This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 4745138601b7 [SPARK-46275][3.4] Protobuf: Return null in permissive 
mode when deserialization fails
4745138601b7 is described below

commit 4745138601b74e805459bd240f748fcf3e7ddec2
Author: Raghu Angadi <raghu.ang...@databricks.com>
AuthorDate: Fri Dec 8 14:40:03 2023 -0800

    [SPARK-46275][3.4] Protobuf: Return null in permissive mode when 
deserialization fails
    
    This is a cherry-pick of #44214 into 3.4 branch.
    
    From the original PR:
    
    ### What changes were proposed in this pull request?
    This updates the the behavior of `from_protobuf()` built function when 
underlying record fails to deserialize.
    
      * **Current behvior**:
        * By default, this would throw an error and the query fails. [This part 
is not changed in the PR]
        * When `mode` is set to 'PERMISSIVE' it returns a non-null struct with 
each of the inner fields set to null e.g. `{ "field_a": null, "field_b": null 
}`  etc.
           * This is not very convenient to the users. They don't know if this 
was due to malformed record or if the input itself has null. It is very hard to 
check for each field for null in SQL query (imagine a sql query with a struct 
that has 10 fields).
    
      * **New behavior**
        * When `mode` is set to 'PERMISSIVE' it simply returns `null`.
    
    ### Why are the changes needed?
    This makes it easier for users to detect and handle malformed records.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, but this does not change the contract. In fact, it clarifies it.
    
    ### How was this patch tested?
     - Unit tests are updated.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #44265 from rangadi/protobuf-null-3.4.
    
    Authored-by: Raghu Angadi <raghu.ang...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../sql/protobuf/ProtobufDataToCatalyst.scala      | 31 ++++------------------
 .../ProtobufCatalystDataConversionSuite.scala      | 13 +--------
 2 files changed, 6 insertions(+), 38 deletions(-)

diff --git 
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala
 
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala
index da44f94d5eac..78e995190045 100644
--- 
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala
+++ 
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala
@@ -21,12 +21,12 @@ import scala.util.control.NonFatal
 
 import com.google.protobuf.DynamicMessage
 
-import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, 
Expression, SpecificInternalRow, UnaryExpression}
+import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, 
Expression, UnaryExpression}
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
CodeGenerator, ExprCode}
 import org.apache.spark.sql.catalyst.util.{FailFastMode, ParseMode, 
PermissiveMode}
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
 import org.apache.spark.sql.protobuf.utils.{ProtobufOptions, ProtobufUtils, 
SchemaConverters}
-import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType, 
StructType}
+import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType}
 
 private[protobuf] case class ProtobufDataToCatalyst(
     child: Expression,
@@ -38,16 +38,8 @@ private[protobuf] case class ProtobufDataToCatalyst(
 
   override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType)
 
-  override lazy val dataType: DataType = {
-    val dt = SchemaConverters.toSqlType(messageDescriptor, 
protobufOptions).dataType
-    parseMode match {
-      // With PermissiveMode, the output Catalyst row might contain columns of 
null values for
-      // corrupt records, even if some of the columns are not nullable in the 
user-provided schema.
-      // Therefore we force the schema to be all nullable here.
-      case PermissiveMode => dt.asNullable
-      case _ => dt
-    }
-  }
+  override lazy val dataType: DataType =
+    SchemaConverters.toSqlType(messageDescriptor, protobufOptions).dataType
 
   override def nullable: Boolean = true
 
@@ -75,22 +67,9 @@ private[protobuf] case class ProtobufDataToCatalyst(
     mode
   }
 
-  @transient private lazy val nullResultRow: Any = dataType match {
-    case st: StructType =>
-      val resultRow = new SpecificInternalRow(st.map(_.dataType))
-      for (i <- 0 until st.length) {
-        resultRow.setNullAt(i)
-      }
-      resultRow
-
-    case _ =>
-      null
-  }
-
   private def handleException(e: Throwable): Any = {
     parseMode match {
-      case PermissiveMode =>
-        nullResultRow
+      case PermissiveMode => null
       case FailFastMode =>
         throw 
QueryExecutionErrors.malformedProtobufMessageDetectedInMessageParsingError(e)
       case _ =>
diff --git 
a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
 
b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
index 904827174e56..5642c46c9263 100644
--- 
a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
+++ 
b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala
@@ -78,20 +78,9 @@ class ProtobufCatalystDataConversionSuite
         .eval()
     }
 
-    val expected = {
-      val expectedSchema = ProtobufUtils.buildDescriptor(descFilePath, 
badSchema)
-      SchemaConverters.toSqlType(expectedSchema).dataType match {
-        case st: StructType =>
-          Row.fromSeq((0 until st.length).map { _ =>
-            null
-          })
-        case _ => null
-      }
-    }
-
     checkEvaluation(
       ProtobufDataToCatalyst(binary, badSchema, Some(descFilePath), Map("mode" 
-> "PERMISSIVE")),
-      expected)
+      expected = null)
   }
 
   protected def prepareExpectedResult(expected: Any): Any = expected match {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to