This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 1cba3b98160 [SPARK-42236][SQL] Refine `NULLABLE_ARRAY_OR_MAP_ELEMENT` 1cba3b98160 is described below commit 1cba3b98160ad9d7cdf29e84ff0191598177835c Author: itholic <haejoon....@databricks.com> AuthorDate: Tue Jan 31 19:35:57 2023 +0300 [SPARK-42236][SQL] Refine `NULLABLE_ARRAY_OR_MAP_ELEMENT` ### What changes were proposed in this pull request? This PR proposes to refine `NULLABLE_ARRAY_OR_MAP_ELEMENT` into main-sub classes structure. `NOT_NULL_CONSTRAINT_VIOLATION` - `ARRAY_ELEMENT` - `MAP_VALUE` ### Why are the changes needed? The name of error class is misleading, and we can make this more generic so that we reuse for various situation. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Updated & added UTs. Closes #39804 from itholic/NULLABLE_ARRAY_OR_MAP_ELEMENT. Authored-by: itholic <haejoon....@databricks.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../spark/sql/protobuf/ProtobufDeserializer.scala | 5 +++-- core/src/main/resources/error/error-classes.json | 24 ++++++++++++++++------ .../plans/logical/basicLogicalOperators.scala | 4 ++-- .../spark/sql/errors/QueryCompilationErrors.scala | 16 +++++++++++++-- .../apache/spark/sql/DataFrameToSchemaSuite.scala | 14 ++++++++++++- 5 files changed, 50 insertions(+), 13 deletions(-) diff --git a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala index 224e22c0f52..37278fab8a3 100644 --- a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala +++ b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala @@ -91,7 +91,8 @@ private[sql] class ProtobufDeserializer( val element = iterator.next() if (element == null) { if (!containsNull) { - throw QueryCompilationErrors.nullableArrayOrMapElementError(protoElementPath) + throw QueryCompilationErrors.notNullConstraintViolationArrayElementError( + protoElementPath) } else { elementUpdater.setNullAt(i) } @@ -129,7 +130,7 @@ private[sql] class ProtobufDeserializer( keyWriter(keyUpdater, i, field.getField(keyField)) if (field.getField(valueField) == null) { if (!valueContainsNull) { - throw QueryCompilationErrors.nullableArrayOrMapElementError(protoPath) + throw QueryCompilationErrors.notNullConstraintViolationMapValueError(protoPath) } else { valueUpdater.setNullAt(i) } diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 56d85ed866c..230b616800f 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -1038,6 +1038,24 @@ "Operation <operation> is not allowed for <tableIdentWithDB> because it is not a partitioned table." ] }, + "NOT_NULL_CONSTRAINT_VIOLATION" : { + "message" : [ + "Assigning a NULL is not allowed here." + ], + "subClass" : { + "ARRAY_ELEMENT" : { + "message" : [ + "The array <columnPath> is defined to contain only elements that are NOT NULL." + ] + }, + "MAP_VALUE" : { + "message" : [ + "The map <columnPath> is defined to contain only values that are NOT NULL." + ] + } + }, + "sqlState" : "42000" + }, "NO_HANDLER_FOR_UDAF" : { "message" : [ "No handler for UDAF '<functionName>'. Use sparkSession.udf.register(...) instead." @@ -1053,12 +1071,6 @@ "UDF class <className> doesn't implement any UDF interface." ] }, - "NULLABLE_ARRAY_OR_MAP_ELEMENT" : { - "message" : [ - "Array or map at <columnPath> contains nullable element while it's required to be non-nullable." - ], - "sqlState" : "42000" - }, "NULLABLE_COLUMN_OR_FIELD" : { "message" : [ "Column or field <name> is nullable while it's required to be non-nullable." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index a8dfb8fbd84..74929bf5d79 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -131,7 +131,7 @@ object Project { case (ArrayType(et, containsNull), expected: ArrayType) => if (containsNull & !expected.containsNull) { - throw QueryCompilationErrors.nullableArrayOrMapElementError(columnPath) + throw QueryCompilationErrors.notNullConstraintViolationArrayElementError(columnPath) } val param = NamedLambdaVariable("x", et, containsNull) val reconciledElement = reconcileColumnType( @@ -141,7 +141,7 @@ object Project { case (MapType(kt, vt, valueContainsNull), expected: MapType) => if (valueContainsNull & !expected.valueContainsNull) { - throw QueryCompilationErrors.nullableArrayOrMapElementError(columnPath) + throw QueryCompilationErrors.notNullConstraintViolationMapValueError(columnPath) } val keyParam = NamedLambdaVariable("key", kt, nullable = false) val valueParam = NamedLambdaVariable("value", vt, valueContainsNull) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 60c9aa8886e..ca97b707e3b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -3183,9 +3183,21 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { messageParameters = Map("name" -> toSQLId(name))) } - def nullableArrayOrMapElementError(path: Seq[String]): Throwable = { + def notNullConstraintViolationArrayElementError(path: Seq[String]): Throwable = { new AnalysisException( - errorClass = "NULLABLE_ARRAY_OR_MAP_ELEMENT", + errorClass = "NOT_NULL_CONSTRAINT_VIOLATION.ARRAY_ELEMENT", + messageParameters = Map("columnPath" -> toSQLId(path))) + } + + def notNullConstraintViolationMapValueError(path: Seq[String]): Throwable = { + new AnalysisException( + errorClass = "NOT_NULL_CONSTRAINT_VIOLATION.MAP_VALUE", + messageParameters = Map("columnPath" -> toSQLId(path))) + } + + def notNullConstraintViolationStructFieldError(path: Seq[String]): Throwable = { + new AnalysisException( + errorClass = "NOT_NULL_CONSTRAINT_VIOLATION.STRUCT_FIELD", messageParameters = Map("columnPath" -> toSQLId(path))) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameToSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameToSchemaSuite.scala index 6a3401073a0..5bbaebbd9ce 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameToSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameToSchemaSuite.scala @@ -262,7 +262,7 @@ class DataFrameToSchemaSuite extends QueryTest with SharedSparkSession { val e = intercept[SparkThrowable](data.to(schema)) checkError( exception = e, - errorClass = "NULLABLE_ARRAY_OR_MAP_ELEMENT", + errorClass = "NOT_NULL_CONSTRAINT_VIOLATION.ARRAY_ELEMENT", parameters = Map("columnPath" -> "`arr`")) } @@ -320,4 +320,16 @@ class DataFrameToSchemaSuite extends QueryTest with SharedSparkSession { assert(df.schema == schema) checkAnswer(df, Row(Map("a" -> Row("b", "a")))) } + + test("map value: incompatible map nullability") { + val m = MapType(StringType, StringType, valueContainsNull = false) + val schema = new StructType().add("map", m, nullable = false) + val data = Seq("a" -> null).toDF("i", "j").select(map($"i", $"j").as("map")) + assert(data.schema.fields(0).dataType.asInstanceOf[MapType].valueContainsNull) + val e = intercept[SparkThrowable](data.to(schema)) + checkError( + exception = e, + errorClass = "NOT_NULL_CONSTRAINT_VIOLATION.MAP_VALUE", + parameters = Map("columnPath" -> "`map`")) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org