This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ec6a3ae [SPARK-37176][SQL] Sync JsonInferSchema#infer method's exception handle logic with JacksonParser#parse method ec6a3ae is described below commit ec6a3ae6dff1dc9c63978ae14a1793ccd771ffff Author: Xianjin YE <yexianjin....@bytedance.com> AuthorDate: Tue Nov 2 12:40:09 2021 +0300 [SPARK-37176][SQL] Sync JsonInferSchema#infer method's exception handle logic with JacksonParser#parse method ### What changes were proposed in this pull request? Change `JsonInferSchema#infer`'s exception handle logic to be aligned with `JacksonParser#parse` ### Why are the changes needed? To reduce behavior inconsistency, users can have the same expectation for schema infer and json parse when dealing with some malformed input. ### Does this PR introduce _any_ user-facing change? Yes. Before this patch, json's inferring schema could be failed for some malformed input but succeeded when parsing. After this patch, they have the same exception handle logic. ### How was this patch tested? Added one new test and modify one exist test to cover the new case. Closes #34455 from advancedxy/SPARK-37176. Authored-by: Xianjin YE <yexianjin....@bytedance.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../spark/sql/catalyst/json/JsonInferSchema.scala | 33 +++++++++++++++----- .../test/resources/test-data/malformed_utf8.json | 3 ++ .../sql/execution/datasources/json/JsonSuite.scala | 35 ++++++++++++++++++++++ 3 files changed, 63 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala index 3b17cde..3b62b16 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.json +import java.io.CharConversionException +import java.nio.charset.MalformedInputException import java.util.Comparator import scala.util.control.Exception.allCatch @@ -45,6 +47,18 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { legacyFormat = FAST_DATE_FORMAT, isParsing = true) + private def handleJsonErrorsByParseMode(parseMode: ParseMode, + columnNameOfCorruptRecord: String, e: Throwable): Option[StructType] = { + parseMode match { + case PermissiveMode => + Some(StructType(Seq(StructField(columnNameOfCorruptRecord, StringType)))) + case DropMalformedMode => + None + case FailFastMode => + throw QueryExecutionErrors.malformedRecordsDetectedInSchemaInferenceError(e) + } + } + /** * Infer the type of a collection of json records in three stages: * 1. Infer the type of each record @@ -68,14 +82,17 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { Some(inferField(parser)) } } catch { - case e @ (_: RuntimeException | _: JsonProcessingException) => parseMode match { - case PermissiveMode => - Some(StructType(Seq(StructField(columnNameOfCorruptRecord, StringType)))) - case DropMalformedMode => - None - case FailFastMode => - throw QueryExecutionErrors.malformedRecordsDetectedInSchemaInferenceError(e) - } + case e @ (_: RuntimeException | _: JsonProcessingException | + _: MalformedInputException) => + handleJsonErrorsByParseMode(parseMode, columnNameOfCorruptRecord, e) + case e: CharConversionException if options.encoding.isEmpty => + val msg = + """JSON parser cannot handle a character in its input. + |Specifying encoding as an input option explicitly might help to resolve the issue. + |""".stripMargin + e.getMessage + val wrappedCharException = new CharConversionException(msg) + wrappedCharException.initCause(e) + handleJsonErrorsByParseMode(parseMode, columnNameOfCorruptRecord, wrappedCharException) } }.reduceOption(typeMerger).toIterator } diff --git a/sql/core/src/test/resources/test-data/malformed_utf8.json b/sql/core/src/test/resources/test-data/malformed_utf8.json new file mode 100644 index 0000000..c57eb43 --- /dev/null +++ b/sql/core/src/test/resources/test-data/malformed_utf8.json @@ -0,0 +1,3 @@ +{"a": 1} +{"a": 1} +� \ No newline at end of file diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 3a30e1b..075d6e9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2453,6 +2453,41 @@ abstract class JsonSuite checkAnswer( spark.read.option("mode", "PERMISSIVE").option("encoding", "UTF-8").json(Seq(badJson).toDS()), Row(badJson)) + checkAnswer( + // encoding auto detection should also be possible with json schema infer + spark.read.option("mode", "PERMISSIVE").json(Seq(badJson).toDS()), + Row(badJson)) + } + + test("SPARK-37176: inferring should be possible when parse mode is permissive") { + withTempPath { tempDir => + val path = tempDir.getAbsolutePath + // normal input to let spark correctly infer schema + val record = """{"a":1}""" + Seq(record, badJson + record).toDS().write.text(path) + val expected = s"""${badJson}{"a":1}""" + val df = spark.read.format("json") + .option("mode", "PERMISSIVE") + .load(path) + checkAnswer(df, Seq(Row(null, 1), Row(expected, null))) + } + } + + test("SPARK-31716: inferring should handle malformed input") { + val schema = new StructType().add("a", IntegerType) + val dfWithSchema = spark.read.format("json") + .option("mode", "DROPMALFORMED") + .option("encoding", "utf-8") + .schema(schema) + .load(testFile("test-data/malformed_utf8.json")) + checkAnswer(dfWithSchema, Seq(Row(1), Row(1))) + + val df = spark.read.format("json") + .option("mode", "DROPMALFORMED") + .option("encoding", "utf-8") + .load(testFile("test-data/malformed_utf8.json")) + + checkAnswer(df, Seq(Row(1), Row(1))) } test("SPARK-23772 ignore column of all null values or empty array during schema inference") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org