This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new ab1443052347 [SPARK-46275] Protobuf: Return null in permissive mode when deserialization fails ab1443052347 is described below commit ab14430523473528bafa41d8f10bc33efbb74493 Author: Raghu Angadi <raghu.ang...@databricks.com> AuthorDate: Fri Dec 8 16:40:27 2023 +0900 [SPARK-46275] Protobuf: Return null in permissive mode when deserialization fails ### What changes were proposed in this pull request? This updates the the behavior of `from_protobuf()` built function when underlying record fails to deserialize. * **Current behvior**: * By default, this would throw an error and the query fails. [This part is not changed in the PR] * When `mode` is set to 'PERMISSIVE' it returns a non-null struct with each of the inner fields set to null e.g. `{ "field_a": null, "field_b": null }` etc. * This is not very convenient to the users. They don't know if this was due to malformed record or if the input itself has null. It is very hard to check for each field for null in SQL query (imagine a sql query with a struct that has 10 fields). * **New behavior** * When `mode` is set to 'PERMISSIVE' it simply returns `null`. ### Why are the changes needed? This makes it easier for users to detect and handle malformed records. ### Does this PR introduce _any_ user-facing change? Yes, but this does not change the contract. In fact, it clarifies it. ### How was this patch tested? - Unit tests are updated. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44214 from rangadi/protobuf-null. Authored-by: Raghu Angadi <raghu.ang...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit 309c796876f310f8604292d84acc12e711ba7031) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../sql/protobuf/ProtobufDataToCatalyst.scala | 31 ++++------------------ .../ProtobufCatalystDataConversionSuite.scala | 13 +-------- 2 files changed, 6 insertions(+), 38 deletions(-) diff --git a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala index 5c4a5ff06896..d2417674837b 100644 --- a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala +++ b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala @@ -22,12 +22,12 @@ import scala.util.control.NonFatal import com.google.protobuf.DynamicMessage import com.google.protobuf.TypeRegistry -import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, SpecificInternalRow, UnaryExpression} +import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, UnaryExpression} import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode} import org.apache.spark.sql.catalyst.util.{FailFastMode, ParseMode, PermissiveMode} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.protobuf.utils.{ProtobufOptions, ProtobufUtils, SchemaConverters} -import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType, StructType} +import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType} private[sql] case class ProtobufDataToCatalyst( child: Expression, @@ -39,16 +39,8 @@ private[sql] case class ProtobufDataToCatalyst( override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType) - override lazy val dataType: DataType = { - val dt = SchemaConverters.toSqlType(messageDescriptor, protobufOptions).dataType - parseMode match { - // With PermissiveMode, the output Catalyst row might contain columns of null values for - // corrupt records, even if some of the columns are not nullable in the user-provided schema. - // Therefore we force the schema to be all nullable here. - case PermissiveMode => dt.asNullable - case _ => dt - } - } + override lazy val dataType: DataType = + SchemaConverters.toSqlType(messageDescriptor, protobufOptions).dataType override def nullable: Boolean = true @@ -87,22 +79,9 @@ private[sql] case class ProtobufDataToCatalyst( mode } - @transient private lazy val nullResultRow: Any = dataType match { - case st: StructType => - val resultRow = new SpecificInternalRow(st.map(_.dataType)) - for (i <- 0 until st.length) { - resultRow.setNullAt(i) - } - resultRow - - case _ => - null - } - private def handleException(e: Throwable): Any = { parseMode match { - case PermissiveMode => - nullResultRow + case PermissiveMode => null case FailFastMode => throw QueryExecutionErrors.malformedProtobufMessageDetectedInMessageParsingError(e) case _ => diff --git a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala index b7f17fece5fa..62d0efd7459b 100644 --- a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala +++ b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala @@ -79,20 +79,9 @@ class ProtobufCatalystDataConversionSuite .eval() } - val expected = { - val expectedSchema = ProtobufUtils.buildDescriptor(descBytes, badSchema) - SchemaConverters.toSqlType(expectedSchema).dataType match { - case st: StructType => - Row.fromSeq((0 until st.length).map { _ => - null - }) - case _ => null - } - } - checkEvaluation( ProtobufDataToCatalyst(binary, badSchema, Some(descBytes), Map("mode" -> "PERMISSIVE")), - expected) + expected = null) } protected def prepareExpectedResult(expected: Any): Any = expected match { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org