Repository: spark
Updated Branches:
  refs/heads/master d68f3a726 -> 17449a2e6


[SPARK-25952][SQL] Passing actual schema to JacksonParser

## What changes were proposed in this pull request?

The PR fixes an issue when the corrupt record column specified via 
`spark.sql.columnNameOfCorruptRecord` or JSON options 
`columnNameOfCorruptRecord` is propagated to JacksonParser, and returned row 
breaks an assumption in `FailureSafeParser` that the row must contain only 
actual data. The issue is fixed by passing actual schema without the corrupt 
record field into `JacksonParser`.

## How was this patch tested?

Added a test with the corrupt record column in the middle of user's schema.

Closes #22958 from MaxGekk/from_json-corrupt-record-schema.

Authored-by: Maxim Gekk <max.g...@gmail.com>
Signed-off-by: hyukjinkwon <gurwls...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/17449a2e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/17449a2e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/17449a2e

Branch: refs/heads/master
Commit: 17449a2e6b28ecce7a273284eab037e8aceb3611
Parents: d68f3a7
Author: Maxim Gekk <max.g...@gmail.com>
Authored: Thu Nov 8 14:48:23 2018 +0800
Committer: hyukjinkwon <gurwls...@apache.org>
Committed: Thu Nov 8 14:48:23 2018 +0800

----------------------------------------------------------------------
 .../sql/catalyst/expressions/jsonExpressions.scala    | 14 ++++++++------
 .../org/apache/spark/sql/JsonFunctionsSuite.scala     | 13 +++++++++++++
 2 files changed, 21 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/17449a2e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index eafcb61..52d0677 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -569,14 +569,16 @@ case class JsonToStructs(
       throw new IllegalArgumentException(s"from_json() doesn't support the 
${mode.name} mode. " +
         s"Acceptable modes are ${PermissiveMode.name} and 
${FailFastMode.name}.")
     }
-    val rawParser = new JacksonParser(nullableSchema, parsedOptions, 
allowArrayAsStructs = false)
-    val createParser = CreateJacksonParser.utf8String _
-
-    val parserSchema = nullableSchema match {
-      case s: StructType => s
-      case other => StructType(StructField("value", other) :: Nil)
+    val (parserSchema, actualSchema) = nullableSchema match {
+      case s: StructType =>
+        (s, StructType(s.filterNot(_.name == 
parsedOptions.columnNameOfCorruptRecord)))
+      case other =>
+        (StructType(StructField("value", other) :: Nil), other)
     }
 
+    val rawParser = new JacksonParser(actualSchema, parsedOptions, 
allowArrayAsStructs = false)
+    val createParser = CreateJacksonParser.utf8String _
+
     new FailureSafeParser[UTF8String](
       input => rawParser.parse(input, createParser, identity[UTF8String]),
       mode,

http://git-wip-us.apache.org/repos/asf/spark/blob/17449a2e/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index 2b09782..d6b7338 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -578,4 +578,17 @@ class JsonFunctionsSuite extends QueryTest with 
SharedSQLContext {
           "Acceptable modes are PERMISSIVE and FAILFAST."))
     }
   }
+
+  test("corrupt record column in the middle") {
+    val schema = new StructType()
+      .add("a", IntegerType)
+      .add("_unparsed", StringType)
+      .add("b", IntegerType)
+    val badRec = """{"a" 1, "b": 11}"""
+    val df = Seq(badRec, """{"a": 2, "b": 12}""").toDS()
+
+    checkAnswer(
+      df.select(from_json($"value", schema, Map("columnNameOfCorruptRecord" -> 
"_unparsed"))),
+      Row(Row(null, badRec, null)) :: Row(Row(2, null, 12)) :: Nil)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to