Repository: spark
Updated Branches:
  refs/heads/master 39d538ddd -> bc2767df2


[SPARK-17374][SQL] Better error messages when parsing JSON using DataFrameReader

## What changes were proposed in this pull request?

This PR adds better error messages for malformed record when reading a JSON 
file using DataFrameReader.

For example, for query:
```
import org.apache.spark.sql.types._
val corruptRecords = spark.sparkContext.parallelize("""{"a":{, b:3}""" :: Nil)
val schema = StructType(StructField("a", StringType, true) :: Nil)
val jsonDF = spark.read.schema(schema).json(corruptRecords)
```

**Before change:**
We silently replace corrupted line with null
```
scala> jsonDF.show
+----+
|   a|
+----+
|null|
+----+
```

**After change:**
Add an explicit warning message:
```
scala> jsonDF.show
16/09/02 14:43:16 WARN JacksonParser: Found at least one malformed records 
(sample: {"a":{, b:3}). The JSON reader will replace
all malformed records with placeholder null in current PERMISSIVE parser mode.
To find out which corrupted records have been replaced with null, please use the
default inferred schema instead of providing a custom schema.

Code example to print all malformed records (scala):
===================================================
// The corrupted record exists in column _corrupt_record.
val parsedJson = spark.read.json("/path/to/json/file/test.json")

+----+
|   a|
+----+
|null|
+----+
```

###

## How was this patch tested?

Unit test.

Author: Sean Zhong <seanzh...@databricks.com>

Closes #14929 from clockfly/logwarning_if_schema_not_contain_corrupted_record.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc2767df
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc2767df
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc2767df

Branch: refs/heads/master
Commit: bc2767df2666ff615e7f44e980555afab06dd8a3
Parents: 39d538d
Author: Sean Zhong <seanzh...@databricks.com>
Authored: Tue Sep 6 22:20:55 2016 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Tue Sep 6 22:20:55 2016 +0800

----------------------------------------------------------------------
 .../datasources/json/JacksonParser.scala        | 39 +++++++++++++++++++-
 .../execution/datasources/json/JsonSuite.scala  | 29 ++++++++++++++-
 2 files changed, 66 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bc2767df/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
index 359a3e2..5ce1bf7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
@@ -28,6 +28,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.util._
+import 
org.apache.spark.sql.execution.datasources.ParseModes.{DROP_MALFORMED_MODE, 
PERMISSIVE_MODE}
 import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
@@ -52,6 +53,11 @@ class JacksonParser(
   private val factory = new JsonFactory()
   options.setJacksonOptions(factory)
 
+  private val emptyRow: Seq[InternalRow] = Seq(new 
GenericInternalRow(schema.length))
+
+  @transient
+  private[this] var isWarningPrintedForMalformedRecord: Boolean = false
+
   /**
    * This function deals with the cases it fails to parse. This function will 
be called
    * when exceptions are caught during converting. This functions also deals 
with `mode` option.
@@ -62,8 +68,39 @@ class JacksonParser(
       throw new RuntimeException(s"Malformed line in FAILFAST mode: $record")
     }
     if (options.dropMalformed) {
-      logWarning(s"Dropping malformed line: $record")
+      if (!isWarningPrintedForMalformedRecord) {
+        logWarning(
+          s"""Found at least one malformed records (sample: $record). The JSON 
reader will drop
+             |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
+             |corrupted records have been dropped, please switch the parser 
mode to $PERMISSIVE_MODE
+             |mode and use the default inferred schema.
+             |
+             |Code example to print all malformed records (scala):
+             |===================================================
+             |// The corrupted record exists in column 
${columnNameOfCorruptRecord}
+             |val parsedJson = spark.read.json("/path/to/json/file/test.json")
+             |
+           """.stripMargin)
+        isWarningPrintedForMalformedRecord = true
+      }
       Nil
+    } else if (schema.getFieldIndex(columnNameOfCorruptRecord).isEmpty) {
+      if (!isWarningPrintedForMalformedRecord) {
+        logWarning(
+          s"""Found at least one malformed records (sample: $record). The JSON 
reader will replace
+             |all malformed records with placeholder null in current 
$PERMISSIVE_MODE parser mode.
+             |To find out which corrupted records have been replaced with 
null, please use the
+             |default inferred schema instead of providing a custom schema.
+             |
+             |Code example to print all malformed records (scala):
+             |===================================================
+             |// The corrupted record exists in column 
${columnNameOfCorruptRecord}.
+             |val parsedJson = spark.read.json("/path/to/json/file/test.json")
+             |
+           """.stripMargin)
+        isWarningPrintedForMalformedRecord = true
+      }
+      emptyRow
     } else {
       val row = new GenericMutableRow(schema.length)
       for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecord)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/bc2767df/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 63a9061..3d533c1 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -1081,7 +1081,34 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
     assert(jsonDFTwo.schema === schemaTwo)
   }
 
-  test("Corrupt records: PERMISSIVE mode") {
+  test("Corrupt records: PERMISSIVE mode, without designated column for 
malformed records") {
+    withTempView("jsonTable") {
+      val schema = StructType(
+        StructField("a", StringType, true) ::
+          StructField("b", StringType, true) ::
+          StructField("c", StringType, true) :: Nil)
+
+      val jsonDF = spark.read.schema(schema).json(corruptRecords)
+      jsonDF.createOrReplaceTempView("jsonTable")
+
+      checkAnswer(
+        sql(
+          """
+            |SELECT a, b, c
+            |FROM jsonTable
+          """.stripMargin),
+        Seq(
+          // Corrupted records are replaced with null
+          Row(null, null, null),
+          Row(null, null, null),
+          Row(null, null, null),
+          Row("str_a_4", "str_b_4", "str_c_4"),
+          Row(null, null, null))
+      )
+    }
+  }
+
+  test("Corrupt records: PERMISSIVE mode, with designated column for malformed 
records") {
     // Test if we can query corrupt records.
     withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") {
       withTempView("jsonTable") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to