Repository: spark Updated Branches: refs/heads/master d68f3a726 -> 17449a2e6
[SPARK-25952][SQL] Passing actual schema to JacksonParser ## What changes were proposed in this pull request? The PR fixes an issue when the corrupt record column specified via `spark.sql.columnNameOfCorruptRecord` or JSON options `columnNameOfCorruptRecord` is propagated to JacksonParser, and returned row breaks an assumption in `FailureSafeParser` that the row must contain only actual data. The issue is fixed by passing actual schema without the corrupt record field into `JacksonParser`. ## How was this patch tested? Added a test with the corrupt record column in the middle of user's schema. Closes #22958 from MaxGekk/from_json-corrupt-record-schema. Authored-by: Maxim Gekk <max.g...@gmail.com> Signed-off-by: hyukjinkwon <gurwls...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/17449a2e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/17449a2e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/17449a2e Branch: refs/heads/master Commit: 17449a2e6b28ecce7a273284eab037e8aceb3611 Parents: d68f3a7 Author: Maxim Gekk <max.g...@gmail.com> Authored: Thu Nov 8 14:48:23 2018 +0800 Committer: hyukjinkwon <gurwls...@apache.org> Committed: Thu Nov 8 14:48:23 2018 +0800 ---------------------------------------------------------------------- .../sql/catalyst/expressions/jsonExpressions.scala | 14 ++++++++------ .../org/apache/spark/sql/JsonFunctionsSuite.scala | 13 +++++++++++++ 2 files changed, 21 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/17449a2e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index eafcb61..52d0677 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -569,14 +569,16 @@ case class JsonToStructs( throw new IllegalArgumentException(s"from_json() doesn't support the ${mode.name} mode. " + s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}.") } - val rawParser = new JacksonParser(nullableSchema, parsedOptions, allowArrayAsStructs = false) - val createParser = CreateJacksonParser.utf8String _ - - val parserSchema = nullableSchema match { - case s: StructType => s - case other => StructType(StructField("value", other) :: Nil) + val (parserSchema, actualSchema) = nullableSchema match { + case s: StructType => + (s, StructType(s.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))) + case other => + (StructType(StructField("value", other) :: Nil), other) } + val rawParser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = false) + val createParser = CreateJacksonParser.utf8String _ + new FailureSafeParser[UTF8String]( input => rawParser.parse(input, createParser, identity[UTF8String]), mode, http://git-wip-us.apache.org/repos/asf/spark/blob/17449a2e/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 2b09782..d6b7338 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -578,4 +578,17 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { "Acceptable modes are PERMISSIVE and FAILFAST.")) } } + + test("corrupt record column in the middle") { + val schema = new StructType() + .add("a", IntegerType) + .add("_unparsed", StringType) + .add("b", IntegerType) + val badRec = """{"a" 1, "b": 11}""" + val df = Seq(badRec, """{"a": 2, "b": 12}""").toDS() + + checkAnswer( + df.select(from_json($"value", schema, Map("columnNameOfCorruptRecord" -> "_unparsed"))), + Row(Row(null, badRec, null)) :: Row(Row(2, null, 12)) :: Nil) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org