Repository: spark
Updated Branches:
  refs/heads/master 520d92a19 -> 6273a711b


[SPARK-21610][SQL] Corrupt records are not handled properly when creating a 
dataframe from a file

## What changes were proposed in this pull request?
```
echo '{"field": 1}
{"field": 2}
{"field": "3"}' >/tmp/sample.json
```

```scala
import org.apache.spark.sql.types._

val schema = new StructType()
  .add("field", ByteType)
  .add("_corrupt_record", StringType)

val file = "/tmp/sample.json"

val dfFromFile = spark.read.schema(schema).json(file)

scala> dfFromFile.show(false)
+-----+---------------+
|field|_corrupt_record|
+-----+---------------+
|1    |null           |
|2    |null           |
|null |{"field": "3"} |
+-----+---------------+

scala> dfFromFile.filter($"_corrupt_record".isNotNull).count()
res1: Long = 0

scala> dfFromFile.filter($"_corrupt_record".isNull).count()
res2: Long = 3
```
When the `requiredSchema` only contains `_corrupt_record`, the derived 
`actualSchema` is empty and the `_corrupt_record` are all null for all rows. 
This PR captures above situation and raise an exception with a reasonable 
workaround messag so that users can know what happened and how to fix the query.

## How was this patch tested?

Added test case.

Author: Jen-Ming Chung <jenmingi...@gmail.com>

Closes #18865 from jmchung/SPARK-21610.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6273a711
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6273a711
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6273a711

Branch: refs/heads/master
Commit: 6273a711b69139ef0210f59759030a0b4a26b118
Parents: 520d92a
Author: Jen-Ming Chung <jenmingi...@gmail.com>
Authored: Sun Sep 10 17:26:43 2017 -0700
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Sun Sep 10 17:26:43 2017 -0700

----------------------------------------------------------------------
 docs/sql-programming-guide.md                   |  4 +++
 .../datasources/json/JsonFileFormat.scala       | 14 ++++++++++
 .../execution/datasources/json/JsonSuite.scala  | 29 ++++++++++++++++++++
 3 files changed, 47 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6273a711/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 45ba4d1..0a8acbb 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1543,6 +1543,10 @@ options.
 
 # Migration Guide
 
+## Upgrading From Spark SQL 2.2 to 2.3
+
+  - Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when 
the referenced columns only include the internal corrupt record column (named 
`_corrupt_record` by default). For example, 
`spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()`
 and `spark.read.schema(schema).json(file).select("_corrupt_record").show()`. 
Instead, you can cache or save the parsed results and then send the same query. 
For example, `val df = spark.read.schema(schema).json(file).cache()` and then 
`df.filter($"_corrupt_record".isNotNull).count()`.
+
 ## Upgrading From Spark SQL 2.1 to 2.2
 
   - Spark 2.1.1 introduced a new configuration key: 
`spark.sql.hive.caseSensitiveInferenceMode`. It had a default setting of 
`NEVER_INFER`, which kept behavior identical to 2.1.0. However, Spark 2.2.0 
changes this setting's default value to `INFER_AND_SAVE` to restore 
compatibility with reading Hive metastore tables whose underlying file schema 
have mixed-case column names. With the `INFER_AND_SAVE` configuration value, on 
first access Spark will perform schema inference on any Hive metastore table 
for which it has not already saved an inferred schema. Note that schema 
inference can be a very time consuming operation for tables with thousands of 
partitions. If compatibility with mixed-case column names is not a concern, you 
can safely set `spark.sql.hive.caseSensitiveInferenceMode` to `NEVER_INFER` to 
avoid the initial overhead of schema inference. Note that with the new default 
`INFER_AND_SAVE` setting, the results of the schema inference are saved as a 
metastore key for future use
 . Therefore, the initial schema inference occurs only at a table's first 
access.

http://git-wip-us.apache.org/repos/asf/spark/blob/6273a711/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 53d62d8..b5ed6e4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -113,6 +113,20 @@ class JsonFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
       }
     }
 
+    if (requiredSchema.length == 1 &&
+      requiredSchema.head.name == parsedOptions.columnNameOfCorruptRecord) {
+      throw new AnalysisException(
+        "Since Spark 2.3, the queries from raw JSON/CSV files are disallowed 
when the\n" +
+        "referenced columns only include the internal corrupt record column\n" 
+
+        s"(named ${parsedOptions.columnNameOfCorruptRecord} by default). For 
example:\n" +
+        
"spark.read.schema(schema).json(file).filter($\"_corrupt_record\".isNotNull).count()\n"
 +
+        "and 
spark.read.schema(schema).json(file).select(\"_corrupt_record\").show().\n" +
+        "Instead, you can cache or save the parsed results and then send the 
same query.\n" +
+        "For example, val df = spark.read.schema(schema).json(file).cache() 
and then\n" +
+        "df.filter($\"_corrupt_record\".isNotNull).count()."
+      )
+    }
+
     (file: PartitionedFile) => {
       val parser = new JacksonParser(actualSchema, parsedOptions)
       JsonDataSource(parsedOptions).readFile(

http://git-wip-us.apache.org/repos/asf/spark/blob/6273a711/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 0008954..8c8d41e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -2034,4 +2034,33 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
       }
     }
   }
+
+  test("SPARK-21610: Corrupt records are not handled properly when creating a 
dataframe " +
+    "from a file") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      val data =
+        """{"field": 1}
+          |{"field": 2}
+          |{"field": "3"}""".stripMargin
+      Seq(data).toDF().repartition(1).write.text(path)
+      val schema = new StructType().add("field", 
ByteType).add("_corrupt_record", StringType)
+      // negative cases
+      val msg = intercept[AnalysisException] {
+        
spark.read.schema(schema).json(path).select("_corrupt_record").collect()
+      }.getMessage
+      assert(msg.contains("only include the internal corrupt record column"))
+      intercept[catalyst.errors.TreeNodeException[_]] {
+        
spark.read.schema(schema).json(path).filter($"_corrupt_record".isNotNull).count()
+      }
+      // workaround
+      val df = spark.read.schema(schema).json(path).cache()
+      assert(df.filter($"_corrupt_record".isNotNull).count() == 1)
+      assert(df.filter($"_corrupt_record".isNull).count() == 2)
+      checkAnswer(
+        df.select("_corrupt_record"),
+        Row(null) :: Row(null) :: Row("{\"field\": \"3\"}") :: Nil
+      )
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to