Repository: spark
Updated Branches:
  refs/heads/master c391dc65e -> 03e82e368


[SPARK-25040][SQL] Empty string for non string types should be disallowed

## What changes were proposed in this pull request?

This takes over original PR at #22019. The original proposal is to have null 
for float and double types. Later a more reasonable proposal is to disallow 
empty strings. This patch adds logic to throw exception when finding empty 
strings for non string types.

## How was this patch tested?

Added test.

Closes #22787 from viirya/SPARK-25040.

Authored-by: Liang-Chi Hsieh <vii...@gmail.com>
Signed-off-by: hyukjinkwon <gurwls...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/03e82e36
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/03e82e36
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/03e82e36

Branch: refs/heads/master
Commit: 03e82e36896afb43cc42c8d065ebe41a19ec62a7
Parents: c391dc6
Author: Liang-Chi Hsieh <vii...@gmail.com>
Authored: Tue Oct 23 13:43:53 2018 +0800
Committer: hyukjinkwon <gurwls...@apache.org>
Committed: Tue Oct 23 13:43:53 2018 +0800

----------------------------------------------------------------------
 docs/sql-migration-guide-upgrade.md             |  2 ++
 .../spark/sql/catalyst/json/JacksonParser.scala | 19 +++++-----
 .../execution/datasources/json/JsonSuite.scala  | 37 +++++++++++++++++++-
 3 files changed, 48 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/03e82e36/docs/sql-migration-guide-upgrade.md
----------------------------------------------------------------------
diff --git a/docs/sql-migration-guide-upgrade.md 
b/docs/sql-migration-guide-upgrade.md
index 68a897c..b8b9ad8 100644
--- a/docs/sql-migration-guide-upgrade.md
+++ b/docs/sql-migration-guide-upgrade.md
@@ -11,6 +11,8 @@ displayTitle: Spark SQL Upgrading Guide
 
   - In PySpark, when creating a `SparkSession` with 
`SparkSession.builder.getOrCreate()`, if there is an existing `SparkContext`, 
the builder was trying to update the `SparkConf` of the existing `SparkContext` 
with configurations specified to the builder, but the `SparkContext` is shared 
by all `SparkSession`s, so we should not update them. Since 3.0, the builder 
comes to not update the configurations. This is the same behavior as Java/Scala 
API in 2.3 and above. If you want to update them, you need to update them prior 
to creating a `SparkSession`.
 
+  - In Spark version 2.4 and earlier, the parser of JSON data source treats 
empty strings as null for some data types such as `IntegerType`. For 
`FloatType` and `DoubleType`, it fails on empty strings and throws exceptions. 
Since Spark 3.0, we disallow empty strings and will throw exceptions for data 
types except for `StringType` and `BinaryType`.
+
 ## Upgrading From Spark SQL 2.3 to 2.4
 
   - In Spark version 2.3 and earlier, the second parameter to array_contains 
function is implicitly promoted to the element type of first array type 
parameter. This type promotion can be lossy and may cause `array_contains` 
function to return wrong result. This problem has been addressed in 2.4 by 
employing a safer type promotion mechanism. This can cause some change in 
behavior and are illustrated in the table below.

http://git-wip-us.apache.org/repos/asf/spark/blob/03e82e36/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index 984979a..918c9e7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -168,7 +168,7 @@ class JacksonParser(
         case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT =>
           parser.getFloatValue
 
-        case VALUE_STRING =>
+        case VALUE_STRING if parser.getTextLength >= 1 =>
           // Special case handling for NaN and Infinity.
           parser.getText match {
             case "NaN" => Float.NaN
@@ -184,7 +184,7 @@ class JacksonParser(
         case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT =>
           parser.getDoubleValue
 
-        case VALUE_STRING =>
+        case VALUE_STRING if parser.getTextLength >= 1 =>
           // Special case handling for NaN and Infinity.
           parser.getText match {
             case "NaN" => Double.NaN
@@ -211,7 +211,7 @@ class JacksonParser(
 
     case TimestampType =>
       (parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) 
{
-        case VALUE_STRING =>
+        case VALUE_STRING if parser.getTextLength >= 1 =>
           val stringValue = parser.getText
           // This one will lose microseconds parts.
           // See https://issues.apache.org/jira/browse/SPARK-10681.
@@ -230,7 +230,7 @@ class JacksonParser(
 
     case DateType =>
       (parser: JsonParser) => parseJsonToken[java.lang.Integer](parser, 
dataType) {
-        case VALUE_STRING =>
+        case VALUE_STRING if parser.getTextLength >= 1 =>
           val stringValue = parser.getText
           // This one will lose microseconds parts.
           // See https://issues.apache.org/jira/browse/SPARK-10681.x
@@ -310,16 +310,17 @@ class JacksonParser(
   }
 
   /**
-   * This function throws an exception for failed conversion, but returns null 
for empty string,
-   * to guard the non string types.
+   * This function throws an exception for failed conversion. For empty string 
on data types
+   * except for string and binary types, this also throws an exception.
    */
   private def failedConversion[R >: Null](
       parser: JsonParser,
       dataType: DataType): PartialFunction[JsonToken, R] = {
+
+    // SPARK-25040: Disallow empty strings for data types except for string 
and binary types.
     case VALUE_STRING if parser.getTextLength < 1 =>
-      // If conversion is failed, this produces `null` rather than throwing 
exception.
-      // This will protect the mismatch of types.
-      null
+      throw new RuntimeException(
+        s"Failed to parse an empty string for data type 
${dataType.catalogString}")
 
     case token =>
       // We cannot parse this token based on the given data type. So, we throw 
a

http://git-wip-us.apache.org/repos/asf/spark/blob/03e82e36/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 3e4cc8f..43e1a61 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -249,7 +249,7 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
 
     checkAnswer(
       sql("select nullstr, headers.Host from jsonTable"),
-      Seq(Row("", "1.abc.com"), Row("", null), Row("", null), Row(null, null))
+      Seq(Row("", "1.abc.com"), Row("", null), Row(null, null), Row(null, 
null))
     )
   }
 
@@ -2515,4 +2515,39 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
     checkCount(2)
     countForMalformedJSON(0, Seq(""))
   }
+
+  test("SPARK-25040: empty strings should be disallowed") {
+    def failedOnEmptyString(dataType: DataType): Unit = {
+       val df = spark.read.schema(s"a ${dataType.catalogString}")
+        .option("mode", "FAILFAST").json(Seq("""{"a":""}""").toDS)
+      val errMessage = intercept[SparkException] {
+        df.collect()
+      }.getMessage
+      assert(errMessage.contains(
+        s"Failed to parse an empty string for data type 
${dataType.catalogString}"))
+    }
+
+    def emptyString(dataType: DataType, expected: Any): Unit = {
+      val df = spark.read.schema(s"a ${dataType.catalogString}")
+        .option("mode", "FAILFAST").json(Seq("""{"a":""}""").toDS)
+      checkAnswer(df, Row(expected) :: Nil)
+    }
+
+    failedOnEmptyString(BooleanType)
+    failedOnEmptyString(ByteType)
+    failedOnEmptyString(ShortType)
+    failedOnEmptyString(IntegerType)
+    failedOnEmptyString(LongType)
+    failedOnEmptyString(FloatType)
+    failedOnEmptyString(DoubleType)
+    failedOnEmptyString(DecimalType.SYSTEM_DEFAULT)
+    failedOnEmptyString(TimestampType)
+    failedOnEmptyString(DateType)
+    failedOnEmptyString(ArrayType(IntegerType))
+    failedOnEmptyString(MapType(StringType, IntegerType, true))
+    failedOnEmptyString(StructType(StructField("f1", IntegerType, true) :: 
Nil))
+
+    emptyString(StringType, "")
+    emptyString(BinaryType, "".getBytes(StandardCharsets.UTF_8))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to