Repository: spark
Updated Branches:
  refs/heads/master b270bccff -> ab06c2535


[SPARK-24391][SQL] Support arrays of any types by from_json

## What changes were proposed in this pull request?

The PR removes a restriction for element types of array type which exists in 
`from_json` for the root type. Currently, the function can handle only arrays 
of structs. Even array of primitive types is disallowed. The PR allows arrays 
of any types currently supported by JSON datasource. Here is an example of an 
array of a primitive type:

```
scala> import org.apache.spark.sql.functions._
scala> val df = Seq("[1, 2, 3]").toDF("a")
scala> val schema = new ArrayType(IntegerType, false)
scala> val arr = df.select(from_json($"a", schema))
scala> arr.printSchema
root
 |-- jsontostructs(a): array (nullable = true)
 |    |-- element: integer (containsNull = true)
```
and result of converting of the json string to the `ArrayType`:
```
scala> arr.show
+----------------+
|jsontostructs(a)|
+----------------+
|       [1, 2, 3]|
+----------------+
```

## How was this patch tested?

I added a few positive and negative tests:
- array of primitive types
- array of arrays
- array of structs
- array of maps

Closes #21439 from MaxGekk/from_json-array.

Lead-authored-by: Maxim Gekk <maxim.g...@databricks.com>
Co-authored-by: Maxim Gekk <max.g...@gmail.com>
Signed-off-by: hyukjinkwon <gurwls...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ab06c253
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ab06c253
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ab06c253

Branch: refs/heads/master
Commit: ab06c25350f8a997bef0c3dd8aa82b709e7dfb3f
Parents: b270bcc
Author: Maxim Gekk <maxim.g...@databricks.com>
Authored: Mon Aug 13 20:13:09 2018 +0800
Committer: hyukjinkwon <gurwls...@apache.org>
Committed: Mon Aug 13 20:13:09 2018 +0800

----------------------------------------------------------------------
 python/pyspark/sql/functions.py                 |  7 +-
 .../catalyst/expressions/jsonExpressions.scala  | 19 ++---
 .../spark/sql/catalyst/json/JacksonParser.scala | 30 ++++++++
 .../scala/org/apache/spark/sql/functions.scala  | 10 +--
 .../sql-tests/inputs/json-functions.sql         | 12 ++++
 .../sql-tests/results/json-functions.sql.out    | 66 ++++++++++++++++-
 .../apache/spark/sql/JsonFunctionsSuite.scala   | 76 ++++++++++++++++++--
 7 files changed, 194 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ab06c253/python/pyspark/sql/functions.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index eaecf28..f583373 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -2241,7 +2241,7 @@ def json_tuple(col, *fields):
 def from_json(col, schema, options={}):
     """
     Parses a column containing a JSON string into a :class:`MapType` with 
:class:`StringType`
-    as keys type, :class:`StructType` or :class:`ArrayType` of 
:class:`StructType`\\s with
+    as keys type, :class:`StructType` or :class:`ArrayType` with
     the specified schema. Returns `null`, in the case of an unparseable string.
 
     :param col: string column in json format
@@ -2269,6 +2269,11 @@ def from_json(col, schema, options={}):
     >>> schema = schema_of_json(lit('''{"a": 0}'''))
     >>> df.select(from_json(df.value, schema).alias("json")).collect()
     [Row(json=Row(a=1))]
+    >>> data = [(1, '''[1, 2, 3]''')]
+    >>> schema = ArrayType(IntegerType())
+    >>> df = spark.createDataFrame(data, ("key", "value"))
+    >>> df.select(from_json(df.value, schema).alias("json")).collect()
+    [Row(json=[1, 2, 3])]
     """
 
     sc = SparkContext._active_spark_context

http://git-wip-us.apache.org/repos/asf/spark/blob/ab06c253/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index abe8875..ca99100 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -495,7 +495,7 @@ case class JsonTuple(children: Seq[Expression])
 }
 
 /**
- * Converts an json input string to a [[StructType]] or [[ArrayType]] of 
[[StructType]]s
+ * Converts an json input string to a [[StructType]], [[ArrayType]] or 
[[MapType]]
  * with the specified schema.
  */
 // scalastyle:off line.size.limit
@@ -544,17 +544,10 @@ case class JsonToStructs(
       timeZoneId = None)
 
   override def checkInputDataTypes(): TypeCheckResult = nullableSchema match {
-    case _: StructType | ArrayType(_: StructType, _) | _: MapType =>
+    case _: StructType | _: ArrayType | _: MapType =>
       super.checkInputDataTypes()
     case _ => TypeCheckResult.TypeCheckFailure(
-      s"Input schema ${nullableSchema.catalogString} must be a struct or an 
array of structs.")
-  }
-
-  @transient
-  lazy val rowSchema = nullableSchema match {
-    case st: StructType => st
-    case ArrayType(st: StructType, _) => st
-    case mt: MapType => mt
+      s"Input schema ${nullableSchema.catalogString} must be a struct, an 
array or a map.")
   }
 
   // This converts parsed rows to the desired output by the given schema.
@@ -562,8 +555,8 @@ case class JsonToStructs(
   lazy val converter = nullableSchema match {
     case _: StructType =>
       (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null
-    case ArrayType(_: StructType, _) =>
-      (rows: Seq[InternalRow]) => new GenericArrayData(rows)
+    case _: ArrayType =>
+      (rows: Seq[InternalRow]) => rows.head.getArray(0)
     case _: MapType =>
       (rows: Seq[InternalRow]) => rows.head.getMap(0)
   }
@@ -571,7 +564,7 @@ case class JsonToStructs(
   @transient
   lazy val parser =
     new JacksonParser(
-      rowSchema,
+      nullableSchema,
       new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get))
 
   override def dataType: DataType = nullableSchema

http://git-wip-us.apache.org/repos/asf/spark/blob/ab06c253/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index 4d409ca..6feea50 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -61,6 +61,7 @@ class JacksonParser(
     dt match {
       case st: StructType => makeStructRootConverter(st)
       case mt: MapType => makeMapRootConverter(mt)
+      case at: ArrayType => makeArrayRootConverter(at)
     }
   }
 
@@ -101,6 +102,35 @@ class JacksonParser(
     }
   }
 
+  private def makeArrayRootConverter(at: ArrayType): JsonParser => 
Seq[InternalRow] = {
+    val elemConverter = makeConverter(at.elementType)
+    (parser: JsonParser) => parseJsonToken[Seq[InternalRow]](parser, at) {
+      case START_ARRAY => Seq(InternalRow(convertArray(parser, elemConverter)))
+      case START_OBJECT if at.elementType.isInstanceOf[StructType] =>
+        // This handles the case when an input JSON object is a structure but
+        // the specified schema is an array of structures. In that case, the 
input JSON is
+        // considered as an array of only one element of struct type.
+        // This behavior was introduced by changes for SPARK-19595.
+        //
+        // For example, if the specified schema is ArrayType(new 
StructType().add("i", IntegerType))
+        // and JSON input as below:
+        //
+        // [{"i": 1}, {"i": 2}]
+        // [{"i": 3}]
+        // {"i": 4}
+        //
+        // The last row is considered as an array with one element, and result 
of conversion:
+        //
+        // Seq(Row(1), Row(2))
+        // Seq(Row(3))
+        // Seq(Row(4))
+        //
+        val st = at.elementType.asInstanceOf[StructType]
+        val fieldConverters = st.map(_.dataType).map(makeConverter).toArray
+        Seq(InternalRow(new GenericArrayData(Seq(convertObject(parser, st, 
fieldConverters)))))
+    }
+  }
+
   /**
    * Create a converter which converts the JSON documents held by the 
`JsonParser`
    * to a value according to a desired schema.

http://git-wip-us.apache.org/repos/asf/spark/blob/ab06c253/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 310e428..5a6ed59 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -3339,7 +3339,7 @@ object functions {
 
   /**
    * (Scala-specific) Parses a column containing a JSON string into a 
`MapType` with `StringType`
-   * as keys type, `StructType` or `ArrayType` of `StructType`s with the 
specified schema.
+   * as keys type, `StructType` or `ArrayType` with the specified schema.
    * Returns `null`, in the case of an unparseable string.
    *
    * @param e a string column containing JSON data.
@@ -3371,7 +3371,7 @@ object functions {
 
   /**
    * (Java-specific) Parses a column containing a JSON string into a `MapType` 
with `StringType`
-   * as keys type, `StructType` or `ArrayType` of `StructType`s with the 
specified schema.
+   * as keys type, `StructType` or `ArrayType` with the specified schema.
    * Returns `null`, in the case of an unparseable string.
    *
    * @param e a string column containing JSON data.
@@ -3400,7 +3400,7 @@ object functions {
 
   /**
    * Parses a column containing a JSON string into a `MapType` with 
`StringType` as keys type,
-   * `StructType` or `ArrayType` of `StructType`s with the specified schema.
+   * `StructType` or `ArrayType` with the specified schema.
    * Returns `null`, in the case of an unparseable string.
    *
    * @param e a string column containing JSON data.
@@ -3414,7 +3414,7 @@ object functions {
 
   /**
    * (Java-specific) Parses a column containing a JSON string into a `MapType` 
with `StringType`
-   * as keys type, `StructType` or `ArrayType` of `StructType`s with the 
specified schema.
+   * as keys type, `StructType` or `ArrayType` with the specified schema.
    * Returns `null`, in the case of an unparseable string.
    *
    * @param e a string column containing JSON data.
@@ -3431,7 +3431,7 @@ object functions {
 
   /**
    * (Scala-specific) Parses a column containing a JSON string into a 
`MapType` with `StringType`
-   * as keys type, `StructType` or `ArrayType` of `StructType`s with the 
specified schema.
+   * as keys type, `StructType` or `ArrayType` with the specified schema.
    * Returns `null`, in the case of an unparseable string.
    *
    * @param e a string column containing JSON data.

http://git-wip-us.apache.org/repos/asf/spark/blob/ab06c253/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql 
b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql
index 79fdd58..0cf370c 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql
@@ -39,3 +39,15 @@ select from_json('{"a":1, "b":"2"}', 
'struct<a:int,b:string>');
 -- infer schema of json literal
 select schema_of_json('{"c1":0, "c2":[1]}');
 select from_json('{"c1":[1, 2, 3]}', schema_of_json('{"c1":[0]}'));
+
+-- from_json - array type
+select from_json('[1, 2, 3]', 'array<int>');
+select from_json('[1, "2", 3]', 'array<int>');
+select from_json('[1, 2, null]', 'array<int>');
+
+select from_json('[{"a": 1}, {"a":2}]', 'array<struct<a:int>>');
+select from_json('{"a": 1}', 'array<struct<a:int>>');
+select from_json('[null, {"a":2}]', 'array<struct<a:int>>');
+
+select from_json('[{"a": 1}, {"b":2}]', 'array<map<string,int>>');
+select from_json('[{"a": 1}, 2]', 'array<map<string,int>>');

http://git-wip-us.apache.org/repos/asf/spark/blob/ab06c253/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out 
b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
index 827931d..b44883b 100644
--- a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
@@ -1,5 +1,5 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 30
+-- Number of queries: 38
 
 
 -- !query 0
@@ -290,3 +290,67 @@ select from_json('{"c1":[1, 2, 3]}', 
schema_of_json('{"c1":[0]}'))
 struct<jsontostructs({"c1":[1, 2, 3]}):struct<c1:array<bigint>>>
 -- !query 29 output
 {"c1":[1,2,3]}
+
+
+-- !query 30
+select from_json('[1, 2, 3]', 'array<int>')
+-- !query 30 schema
+struct<jsontostructs([1, 2, 3]):array<int>>
+-- !query 30 output
+[1,2,3]
+
+
+-- !query 31
+select from_json('[1, "2", 3]', 'array<int>')
+-- !query 31 schema
+struct<jsontostructs([1, "2", 3]):array<int>>
+-- !query 31 output
+NULL
+
+
+-- !query 32
+select from_json('[1, 2, null]', 'array<int>')
+-- !query 32 schema
+struct<jsontostructs([1, 2, null]):array<int>>
+-- !query 32 output
+[1,2,null]
+
+
+-- !query 33
+select from_json('[{"a": 1}, {"a":2}]', 'array<struct<a:int>>')
+-- !query 33 schema
+struct<jsontostructs([{"a": 1}, {"a":2}]):array<struct<a:int>>>
+-- !query 33 output
+[{"a":1},{"a":2}]
+
+
+-- !query 34
+select from_json('{"a": 1}', 'array<struct<a:int>>')
+-- !query 34 schema
+struct<jsontostructs({"a": 1}):array<struct<a:int>>>
+-- !query 34 output
+[{"a":1}]
+
+
+-- !query 35
+select from_json('[null, {"a":2}]', 'array<struct<a:int>>')
+-- !query 35 schema
+struct<jsontostructs([null, {"a":2}]):array<struct<a:int>>>
+-- !query 35 output
+[null,{"a":2}]
+
+
+-- !query 36
+select from_json('[{"a": 1}, {"b":2}]', 'array<map<string,int>>')
+-- !query 36 schema
+struct<jsontostructs([{"a": 1}, {"b":2}]):array<map<string,int>>>
+-- !query 36 output
+[{"a":1},{"b":2}]
+
+
+-- !query 37
+select from_json('[{"a": 1}, 2]', 'array<map<string,int>>')
+-- !query 37 schema
+struct<jsontostructs([{"a": 1}, 2]):array<map<string,int>>>
+-- !query 37 output
+NULL

http://git-wip-us.apache.org/repos/asf/spark/blob/ab06c253/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index d3b2701..f321ab8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -133,15 +133,11 @@ class JsonFunctionsSuite extends QueryTest with 
SharedSQLContext {
       Row(null) :: Nil)
   }
 
-  test("from_json invalid schema") {
+  test("from_json - json doesn't conform to the array type") {
     val df = Seq("""{"a" 1}""").toDS()
     val schema = ArrayType(StringType)
-    val message = intercept[AnalysisException] {
-      df.select(from_json($"value", schema))
-    }.getMessage
 
-    assert(message.contains(
-      "Input schema array<string> must be a struct or an array of structs."))
+    checkAnswer(df.select(from_json($"value", schema)), Seq(Row(null)))
   }
 
   test("from_json array support") {
@@ -405,4 +401,72 @@ class JsonFunctionsSuite extends QueryTest with 
SharedSQLContext {
 
     assert(out.schema == expected)
   }
+
+  test("from_json - array of primitive types") {
+    val df = Seq("[1, 2, 3]").toDF("a")
+    val schema = new ArrayType(IntegerType, false)
+
+    checkAnswer(df.select(from_json($"a", schema)), Seq(Row(Array(1, 2, 3))))
+  }
+
+  test("from_json - array of primitive types - malformed row") {
+    val df = Seq("[1, 2 3]").toDF("a")
+    val schema = new ArrayType(IntegerType, false)
+
+    checkAnswer(df.select(from_json($"a", schema)), Seq(Row(null)))
+  }
+
+  test("from_json - array of arrays") {
+    val jsonDF = Seq("[[1], [2, 3], [4, 5, 6]]").toDF("a")
+    val schema = new ArrayType(ArrayType(IntegerType, false), false)
+    jsonDF.select(from_json($"a", schema) as 
"json").createOrReplaceTempView("jsonTable")
+
+    checkAnswer(
+      sql("select json[0][0], json[1][1], json[2][2] from jsonTable"),
+      Seq(Row(1, 3, 6)))
+  }
+
+  test("from_json - array of arrays - malformed row") {
+    val jsonDF = Seq("[[1], [2, 3], 4, 5, 6]]").toDF("a")
+    val schema = new ArrayType(ArrayType(IntegerType, false), false)
+    jsonDF.select(from_json($"a", schema) as 
"json").createOrReplaceTempView("jsonTable")
+
+    checkAnswer(sql("select json[0] from jsonTable"), Seq(Row(null)))
+  }
+
+  test("from_json - array of structs") {
+    val jsonDF = Seq("""[{"a":1}, {"a":2}, {"a":3}]""").toDF("a")
+    val schema = new ArrayType(new StructType().add("a", IntegerType), false)
+    jsonDF.select(from_json($"a", schema) as 
"json").createOrReplaceTempView("jsonTable")
+
+    checkAnswer(
+      sql("select json[0], json[1], json[2] from jsonTable"),
+      Seq(Row(Row(1), Row(2), Row(3))))
+  }
+
+  test("from_json - array of structs - malformed row") {
+    val jsonDF = Seq("""[{"a":1}, {"a:2}, {"a":3}]""").toDF("a")
+    val schema = new ArrayType(new StructType().add("a", IntegerType), false)
+    jsonDF.select(from_json($"a", schema) as 
"json").createOrReplaceTempView("jsonTable")
+
+    checkAnswer(sql("select json[0], json[1]from jsonTable"), Seq(Row(null, 
null)))
+  }
+
+  test("from_json - array of maps") {
+    val jsonDF = Seq("""[{"a":1}, {"b":2}]""").toDF("a")
+    val schema = new ArrayType(MapType(StringType, IntegerType, false), false)
+    jsonDF.select(from_json($"a", schema) as 
"json").createOrReplaceTempView("jsonTable")
+
+    checkAnswer(
+      sql("""select json[0], json[1] from jsonTable"""),
+      Seq(Row(Map("a" -> 1), Map("b" -> 2))))
+  }
+
+  test("from_json - array of maps - malformed row") {
+    val jsonDF = Seq("""[{"a":1} "b":2}]""").toDF("a")
+    val schema = new ArrayType(MapType(StringType, IntegerType, false), false)
+    jsonDF.select(from_json($"a", schema) as 
"json").createOrReplaceTempView("jsonTable")
+
+    checkAnswer(sql("""select json[0] from jsonTable"""), Seq(Row(null)))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to