This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 6f47ac43e55 [SPARK-42327][SQL] Assign name to `_LEGACY_ERROR_TEMP_2177` 6f47ac43e55 is described below commit 6f47ac43e55c332f63876cf4f8ecf1b41b277651 Author: itholic <haejoon....@databricks.com> AuthorDate: Mon Feb 13 21:33:53 2023 +0500 [SPARK-42327][SQL] Assign name to `_LEGACY_ERROR_TEMP_2177` ### What changes were proposed in this pull request? This PR proposes to assign the name `MALFORMED_RECORD_IN_PARSING` to `_LEGACY_ERROR_TEMP_2177` and improve the error message. ### Why are the changes needed? We should assign proper name to LEGACY errors, and show actionable error messages. ### Does this PR introduce _any_ user-facing change? No, but error message improvements. ### How was this patch tested? Updated UTs. Closes #39980 from itholic/LEGACY_2177. Authored-by: itholic <haejoon....@databricks.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- core/src/main/resources/error/error-classes.json | 11 ++-- .../sql/catalyst/util/FailureSafeParser.scala | 3 +- .../spark/sql/errors/QueryExecutionErrors.scala | 6 +- .../expressions/JsonExpressionsSuite.scala | 8 ++- .../org/apache/spark/sql/CsvFunctionsSuite.scala | 9 ++- .../org/apache/spark/sql/JsonFunctionsSuite.scala | 66 ++++++++++++++++------ .../sql/execution/datasources/csv/CSVSuite.scala | 6 +- .../sql/execution/datasources/json/JsonSuite.scala | 22 ++++++-- .../spark/sql/hive/thriftserver/CliSuite.scala | 4 +- .../ThriftServerWithSparkContextSuite.scala | 4 +- 10 files changed, 95 insertions(+), 44 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index e96383399d2..e329932acf1 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -1063,6 +1063,12 @@ "Malformed Protobuf messages are detected in message deserialization. Parse Mode: <failFastMode>. To process malformed protobuf message as null result, try setting the option 'mode' as 'PERMISSIVE'." ] }, + "MALFORMED_RECORD_IN_PARSING" : { + "message" : [ + "Malformed records are detected in record parsing: <badRecord>.", + "Parse Mode: <failFastMode>. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'." + ] + }, "MISSING_AGGREGATION" : { "message" : [ "The non-aggregating expression <expression> is based on columns which are not participating in the GROUP BY clause.", @@ -4414,11 +4420,6 @@ "Cannot create array with <numElements> elements of data due to exceeding the limit <maxRoundedArrayLength> elements for ArrayData. <additionalErrorMessage>" ] }, - "_LEGACY_ERROR_TEMP_2177" : { - "message" : [ - "Malformed records are detected in record parsing. Parse Mode: <failFastMode>. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'." - ] - }, "_LEGACY_ERROR_TEMP_2178" : { "message" : [ "Remote operations not supported." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala index 5a9e52a51a2..fcdcd21b6dc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala @@ -65,7 +65,8 @@ class FailureSafeParser[IN]( case DropMalformedMode => Iterator.empty case FailFastMode => - throw QueryExecutionErrors.malformedRecordsDetectedInRecordParsingError(e) + throw QueryExecutionErrors.malformedRecordsDetectedInRecordParsingError( + toResultRow(e.partialResult(), e.record).toString, e) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 4134da135e3..fd3809ccd31 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -1743,10 +1743,12 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { "additionalErrorMessage" -> additionalErrorMessage)) } - def malformedRecordsDetectedInRecordParsingError(e: BadRecordException): Throwable = { + def malformedRecordsDetectedInRecordParsingError( + badRecord: String, e: BadRecordException): Throwable = { new SparkException( - errorClass = "_LEGACY_ERROR_TEMP_2177", + errorClass = "MALFORMED_RECORD_IN_PARSING", messageParameters = Map( + "badRecord" -> badRecord, "failFastMode" -> FailFastMode.name), cause = e) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index beeb01619aa..a1db7e4c3ab 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -446,9 +446,11 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with InternalRow(null) ) }.getCause - assert(exception.isInstanceOf[SparkException]) - assert(exception.getMessage.contains( - "Malformed records are detected in record parsing. Parse Mode: FAILFAST")) + checkError( + exception = exception.asInstanceOf[SparkException], + errorClass = "MALFORMED_RECORD_IN_PARSING", + parameters = Map("badRecord" -> "[null]", "failFastMode" -> "FAILFAST") + ) } test("from_json - input=array, schema=array, output=array") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala index fcdc40404e7..67ba5511263 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala @@ -304,9 +304,12 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession { val exception1 = intercept[SparkException] { df.select(from_csv($"value", schema, Map("mode" -> "FAILFAST"))).collect() - }.getMessage - assert(exception1.contains( - "Malformed records are detected in record parsing. Parse Mode: FAILFAST.")) + }.getCause + checkError( + exception = exception1.asInstanceOf[SparkException], + errorClass = "MALFORMED_RECORD_IN_PARSING", + parameters = Map("badRecord" -> "[null,null,\"]", "failFastMode" -> "FAILFAST") + ) val exception2 = intercept[SparkException] { df.select(from_csv($"value", schema, Map("mode" -> "DROPMALFORMED"))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 57c54e88229..f2e0fd57738 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -749,9 +749,14 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession { val exception1 = intercept[SparkException] { df.select(from_json($"value", schema, Map("mode" -> "FAILFAST"))).collect() - }.getMessage - assert(exception1.contains( - "Malformed records are detected in record parsing. Parse Mode: FAILFAST.")) + }.getCause + checkError( + exception = exception1.asInstanceOf[SparkException], + errorClass = "MALFORMED_RECORD_IN_PARSING", + parameters = Map( + "badRecord" -> "[null,null,{\"a\" 1, \"b\": 11}]", + "failFastMode" -> "FAILFAST") + ) val exception2 = intercept[SparkException] { df.select(from_json($"value", schema, Map("mode" -> "DROPMALFORMED"))) @@ -778,10 +783,15 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession { val exception = intercept[SparkException] { df.select(from_json($"value", schema, Map("mode" -> "FAILFAST"))).collect() - } + }.getCause - assert(exception.getMessage.contains( - "Malformed records are detected in record parsing. Parse Mode: FAILFAST.")) + checkError( + exception = exception.asInstanceOf[SparkException], + errorClass = "MALFORMED_RECORD_IN_PARSING", + parameters = Map( + "badRecord" -> "[null,11,{\"a\": \"1\", \"b\": 11}]", + "failFastMode" -> "FAILFAST") + ) checkError( exception = ExceptionUtils.getRootCause(exception).asInstanceOf[SparkRuntimeException], errorClass = "CANNOT_PARSE_JSON_FIELD", @@ -1107,15 +1117,25 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession { val exception1 = intercept[SparkException] { df.select(from_json($"value", schema, Map("mode" -> "FAILFAST"))("b")).collect() - }.getMessage - assert(exception1.contains( - "Malformed records are detected in record parsing. Parse Mode: FAILFAST.")) + }.getCause + checkError( + exception = exception1.asInstanceOf[SparkException], + errorClass = "MALFORMED_RECORD_IN_PARSING", + parameters = Map( + "badRecord" -> "[null,null]", + "failFastMode" -> "FAILFAST") + ) val exception2 = intercept[SparkException] { df.select(from_json($"value", schema, Map("mode" -> "FAILFAST"))("a")).collect() - }.getMessage - assert(exception2.contains( - "Malformed records are detected in record parsing. Parse Mode: FAILFAST.")) + }.getCause + checkError( + exception = exception2.asInstanceOf[SparkException], + errorClass = "MALFORMED_RECORD_IN_PARSING", + parameters = Map( + "badRecord" -> "[null,null]", + "failFastMode" -> "FAILFAST") + ) } } } @@ -1131,15 +1151,25 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession { val exception1 = intercept[SparkException] { df.select(from_json($"value", schema, Map("mode" -> "FAILFAST"))("b")).collect() - }.getMessage - assert(exception1.contains( - "Malformed records are detected in record parsing. Parse Mode: FAILFAST.")) + }.getCause + checkError( + exception = exception1.asInstanceOf[SparkException], + errorClass = "MALFORMED_RECORD_IN_PARSING", + parameters = Map( + "badRecord" -> "[null]", + "failFastMode" -> "FAILFAST") + ) val exception2 = intercept[SparkException] { df.select(from_json($"value", schema, Map("mode" -> "FAILFAST"))("a")).collect() - }.getMessage - assert(exception2.contains( - "Malformed records are detected in record parsing. Parse Mode: FAILFAST.")) + }.getCause + checkError( + exception = exception2.asInstanceOf[SparkException], + errorClass = "MALFORMED_RECORD_IN_PARSING", + parameters = Map( + "badRecord" -> "[null]", + "failFastMode" -> "FAILFAST") + ) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 44f1b2faceb..4cc971a05af 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -3218,8 +3218,10 @@ class CSVv1Suite extends CSVSuite { checkError( exception = exception.getCause.asInstanceOf[SparkException], - errorClass = "_LEGACY_ERROR_TEMP_2177", - parameters = Map("failFastMode" -> "FAILFAST") + errorClass = "MALFORMED_RECORD_IN_PARSING", + parameters = Map( + "badRecord" -> "[2015,Chevy,Volt,null,null]", + "failFastMode" -> "FAILFAST") ) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index efc062e927c..5595a9670ac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1073,9 +1073,14 @@ abstract class JsonSuite .schema("a string") .json(corruptRecords) .collect() - }.getMessage - assert(exceptionTwo.contains( - "Malformed records are detected in record parsing. Parse Mode: FAILFAST.")) + }.getCause + checkError( + exception = exceptionTwo.asInstanceOf[SparkException], + errorClass = "MALFORMED_RECORD_IN_PARSING", + parameters = Map( + "badRecord" -> "[null]", + "failFastMode" -> "FAILFAST") + ) } test("Corrupt records: DROPMALFORMED mode") { @@ -1989,9 +1994,14 @@ abstract class JsonSuite .schema(schema) .json(path) .collect() - } - assert(exceptionTwo.getMessage.contains("Malformed records are detected in record " + - "parsing. Parse Mode: FAILFAST.")) + }.getCause + checkError( + exception = exceptionTwo.asInstanceOf[SparkException], + errorClass = "MALFORMED_RECORD_IN_PARSING", + parameters = Map( + "badRecord" -> "[null]", + "failFastMode" -> "FAILFAST") + ) } } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index 3ab6dcdd995..f73b1b8e68a 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -560,7 +560,7 @@ class CliSuite extends SparkFunSuite { extraArgs = Seq("--hiveconf", "hive.session.silent=false", "-e", "select from_json('a', 'a INT', map('mode', 'FAILFAST'));"), errorResponses = Seq("JsonParseException"))( - ("", "SparkException: Malformed records are detected in record parsing"), + ("", "SparkException: [MALFORMED_RECORD_IN_PARSING]"), ("", "JsonParseException: Unrecognized token 'a'")) // If it is in silent mode, will print the error message only runCliWithin( @@ -568,7 +568,7 @@ class CliSuite extends SparkFunSuite { extraArgs = Seq("--conf", "spark.hive.session.silent=true", "-e", "select from_json('a', 'a INT', map('mode', 'FAILFAST'));"), errorResponses = Seq("SparkException"))( - ("", "SparkException: Malformed records are detected in record parsing")) + ("", "SparkException: [MALFORMED_RECORD_IN_PARSING]")) } test("SPARK-30808: use Java 8 time API in Thrift SQL CLI by default") { diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala index 0228f0ac6d2..0c8a1d1260e 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala @@ -69,7 +69,7 @@ trait ThriftServerWithSparkContextSuite extends SharedThriftServer { } assert(e.getMessage.contains("JsonParseException: Unrecognized token 'a'")) assert(!e.getMessage.contains( - "SparkException: Malformed records are detected in record parsing")) + "SparkException: [MALFORMED_RECORD_IN_PARSING]")) } withJdbcStatement { statement => @@ -78,7 +78,7 @@ trait ThriftServerWithSparkContextSuite extends SharedThriftServer { } assert(e.getMessage.contains("JsonParseException: Unrecognized token 'a'")) assert(e.getMessage.contains( - "SparkException: Malformed records are detected in record parsing")) + "SparkException: [MALFORMED_RECORD_IN_PARSING]")) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org