This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 4745138601b7 [SPARK-46275][3.4] Protobuf: Return null in permissive mode when deserialization fails 4745138601b7 is described below commit 4745138601b74e805459bd240f748fcf3e7ddec2 Author: Raghu Angadi <raghu.ang...@databricks.com> AuthorDate: Fri Dec 8 14:40:03 2023 -0800 [SPARK-46275][3.4] Protobuf: Return null in permissive mode when deserialization fails This is a cherry-pick of #44214 into 3.4 branch. From the original PR: ### What changes were proposed in this pull request? This updates the the behavior of `from_protobuf()` built function when underlying record fails to deserialize. * **Current behvior**: * By default, this would throw an error and the query fails. [This part is not changed in the PR] * When `mode` is set to 'PERMISSIVE' it returns a non-null struct with each of the inner fields set to null e.g. `{ "field_a": null, "field_b": null }` etc. * This is not very convenient to the users. They don't know if this was due to malformed record or if the input itself has null. It is very hard to check for each field for null in SQL query (imagine a sql query with a struct that has 10 fields). * **New behavior** * When `mode` is set to 'PERMISSIVE' it simply returns `null`. ### Why are the changes needed? This makes it easier for users to detect and handle malformed records. ### Does this PR introduce _any_ user-facing change? Yes, but this does not change the contract. In fact, it clarifies it. ### How was this patch tested? - Unit tests are updated. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44265 from rangadi/protobuf-null-3.4. Authored-by: Raghu Angadi <raghu.ang...@databricks.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../sql/protobuf/ProtobufDataToCatalyst.scala | 31 ++++------------------ .../ProtobufCatalystDataConversionSuite.scala | 13 +-------- 2 files changed, 6 insertions(+), 38 deletions(-) diff --git a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala index da44f94d5eac..78e995190045 100644 --- a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala +++ b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala @@ -21,12 +21,12 @@ import scala.util.control.NonFatal import com.google.protobuf.DynamicMessage -import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, SpecificInternalRow, UnaryExpression} +import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, UnaryExpression} import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode} import org.apache.spark.sql.catalyst.util.{FailFastMode, ParseMode, PermissiveMode} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.protobuf.utils.{ProtobufOptions, ProtobufUtils, SchemaConverters} -import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType, StructType} +import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType} private[protobuf] case class ProtobufDataToCatalyst( child: Expression, @@ -38,16 +38,8 @@ private[protobuf] case class ProtobufDataToCatalyst( override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType) - override lazy val dataType: DataType = { - val dt = SchemaConverters.toSqlType(messageDescriptor, protobufOptions).dataType - parseMode match { - // With PermissiveMode, the output Catalyst row might contain columns of null values for - // corrupt records, even if some of the columns are not nullable in the user-provided schema. - // Therefore we force the schema to be all nullable here. - case PermissiveMode => dt.asNullable - case _ => dt - } - } + override lazy val dataType: DataType = + SchemaConverters.toSqlType(messageDescriptor, protobufOptions).dataType override def nullable: Boolean = true @@ -75,22 +67,9 @@ private[protobuf] case class ProtobufDataToCatalyst( mode } - @transient private lazy val nullResultRow: Any = dataType match { - case st: StructType => - val resultRow = new SpecificInternalRow(st.map(_.dataType)) - for (i <- 0 until st.length) { - resultRow.setNullAt(i) - } - resultRow - - case _ => - null - } - private def handleException(e: Throwable): Any = { parseMode match { - case PermissiveMode => - nullResultRow + case PermissiveMode => null case FailFastMode => throw QueryExecutionErrors.malformedProtobufMessageDetectedInMessageParsingError(e) case _ => diff --git a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala index 904827174e56..5642c46c9263 100644 --- a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala +++ b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala @@ -78,20 +78,9 @@ class ProtobufCatalystDataConversionSuite .eval() } - val expected = { - val expectedSchema = ProtobufUtils.buildDescriptor(descFilePath, badSchema) - SchemaConverters.toSqlType(expectedSchema).dataType match { - case st: StructType => - Row.fromSeq((0 until st.length).map { _ => - null - }) - case _ => null - } - } - checkEvaluation( ProtobufDataToCatalyst(binary, badSchema, Some(descFilePath), Map("mode" -> "PERMISSIVE")), - expected) + expected = null) } protected def prepareExpectedResult(expected: Any): Any = expected match { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org