Repository: spark
Updated Branches:
  refs/heads/master 990af630d -> 0cdcf9114


[SPARK-19849][SQL] Support ArrayType in to_json to produce JSON array

## What changes were proposed in this pull request?

This PR proposes to support an array of struct type in `to_json` as below:

```scala
import org.apache.spark.sql.functions._

val df = Seq(Tuple1(Tuple1(1) :: Nil)).toDF("a")
df.select(to_json($"a").as("json")).show()
```

```
+----------+
|      json|
+----------+
|[{"_1":1}]|
+----------+
```

Currently, it throws an exception as below (a newline manually inserted for 
readability):

```
org.apache.spark.sql.AnalysisException: cannot resolve 'structtojson(`array`)' 
due to data type
mismatch: structtojson requires that the expression is a struct expression.;;
```

This allows the roundtrip with `from_json` as below:

```scala
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
val df = Seq("""[{"a":1}, {"a":2}]""").toDF("json").select(from_json($"json", 
schema).as("array"))
df.show()

// Read back.
df.select(to_json($"array").as("json")).show()
```

```
+----------+
|     array|
+----------+
|[[1], [2]]|
+----------+

+-----------------+
|             json|
+-----------------+
|[{"a":1},{"a":2}]|
+-----------------+
```

Also, this PR proposes to rename from `StructToJson` to `StructsToJson ` and 
`JsonToStruct` to `JsonToStructs`.

## How was this patch tested?

Unit tests in `JsonFunctionsSuite` and `JsonExpressionsSuite` for Scala, 
doctest for Python and test in `test_sparkSQL.R` for R.

Author: hyukjinkwon <gurwls...@gmail.com>

Closes #17192 from HyukjinKwon/SPARK-19849.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0cdcf911
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0cdcf911
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0cdcf911

Branch: refs/heads/master
Commit: 0cdcf9114527a2c359c25e46fd6556b3855bfb28
Parents: 990af63
Author: hyukjinkwon <gurwls...@gmail.com>
Authored: Sun Mar 19 22:33:01 2017 -0700
Committer: Felix Cheung <felixche...@apache.org>
Committed: Sun Mar 19 22:33:01 2017 -0700

----------------------------------------------------------------------
 R/pkg/R/functions.R                             | 18 ++--
 R/pkg/inst/tests/testthat/test_sparkSQL.R       |  4 +
 python/pyspark/sql/functions.py                 | 15 ++-
 .../catalyst/analysis/FunctionRegistry.scala    |  4 +-
 .../catalyst/expressions/jsonExpressions.scala  | 70 +++++++++-----
 .../sql/catalyst/json/JacksonGenerator.scala    | 23 +++--
 .../expressions/JsonExpressionsSuite.scala      | 77 +++++++++++-----
 .../scala/org/apache/spark/sql/functions.scala  | 34 ++++---
 .../sql-tests/inputs/json-functions.sql         |  1 +
 .../sql-tests/results/json-functions.sql.out    | 96 +++++++++++---------
 .../apache/spark/sql/JsonFunctionsSuite.scala   | 26 +++++-
 11 files changed, 236 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0cdcf911/R/pkg/R/functions.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R
index 9867f2d..2cff3ac 100644
--- a/R/pkg/R/functions.R
+++ b/R/pkg/R/functions.R
@@ -1795,10 +1795,10 @@ setMethod("to_date",
 
 #' to_json
 #'
-#' Converts a column containing a \code{structType} into a Column of JSON 
string.
-#' Resolving the Column can fail if an unsupported type is encountered.
+#' Converts a column containing a \code{structType} or array of 
\code{structType} into a Column
+#' of JSON string. Resolving the Column can fail if an unsupported type is 
encountered.
 #'
-#' @param x Column containing the struct
+#' @param x Column containing the struct or array of the structs
 #' @param ... additional named properties to control how it is converted, 
accepts the same options
 #'            as the JSON data source.
 #'
@@ -1809,8 +1809,13 @@ setMethod("to_date",
 #' @export
 #' @examples
 #' \dontrun{
-#' to_json(df$t, dateFormat = 'dd/MM/yyyy')
-#' select(df, to_json(df$t))
+#' # Converts a struct into a JSON object
+#' df <- sql("SELECT named_struct('date', cast('2000-01-01' as date)) as d")
+#' select(df, to_json(df$d, dateFormat = 'dd/MM/yyyy'))
+#'
+#' # Converts an array of structs into a JSON array
+#' df <- sql("SELECT array(named_struct('name', 'Bob'), named_struct('name', 
'Alice')) as people")
+#' select(df, to_json(df$people))
 #'}
 #' @note to_json since 2.2.0
 setMethod("to_json", signature(x = "Column"),
@@ -2433,7 +2438,8 @@ setMethod("date_format", signature(y = "Column", x = 
"character"),
 #' from_json
 #'
 #' Parses a column containing a JSON string into a Column of \code{structType} 
with the specified
-#' \code{schema}. If the string is unparseable, the Column will contains the 
value NA.
+#' \code{schema} or array of \code{structType} if \code{asJsonArray} is set to 
\code{TRUE}.
+#' If the string is unparseable, the Column will contains the value NA.
 #'
 #' @param x Column containing the JSON string.
 #' @param schema a structType object to use as the schema to use when parsing 
the JSON string.

http://git-wip-us.apache.org/repos/asf/spark/blob/0cdcf911/R/pkg/inst/tests/testthat/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R 
b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 32856b3..9c38e0d 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -1340,6 +1340,10 @@ test_that("column functions", {
   expect_equal(collect(select(df, bround(df$x, 0)))[[1]][2], 4)
 
   # Test to_json(), from_json()
+  df <- sql("SELECT array(named_struct('name', 'Bob'), named_struct('name', 
'Alice')) as people")
+  j <- collect(select(df, alias(to_json(df$people), "json")))
+  expect_equal(j[order(j$json), ][1], 
"[{\"name\":\"Bob\"},{\"name\":\"Alice\"}]")
+
   df <- read.json(mapTypeJsonPath)
   j <- collect(select(df, alias(to_json(df$info), "json")))
   expect_equal(j[order(j$json), ][1], "{\"age\":16,\"height\":176.5}")

http://git-wip-us.apache.org/repos/asf/spark/blob/0cdcf911/python/pyspark/sql/functions.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 376b86e..f9121e6 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -1774,10 +1774,11 @@ def json_tuple(col, *fields):
 def from_json(col, schema, options={}):
     """
     Parses a column containing a JSON string into a [[StructType]] or 
[[ArrayType]]
-    with the specified schema. Returns `null`, in the case of an unparseable 
string.
+    of [[StructType]]s with the specified schema. Returns `null`, in the case 
of an unparseable
+    string.
 
     :param col: string column in json format
-    :param schema: a StructType or ArrayType to use when parsing the json 
column
+    :param schema: a StructType or ArrayType of StructType to use when parsing 
the json column
     :param options: options to control parsing. accepts the same options as 
the json datasource
 
     >>> from pyspark.sql.types import *
@@ -1802,10 +1803,10 @@ def from_json(col, schema, options={}):
 @since(2.1)
 def to_json(col, options={}):
     """
-    Converts a column containing a [[StructType]] into a JSON string. Throws 
an exception,
-    in the case of an unsupported type.
+    Converts a column containing a [[StructType]] or [[ArrayType]] of 
[[StructType]]s into a
+    JSON string. Throws an exception, in the case of an unsupported type.
 
-    :param col: name of column containing the struct
+    :param col: name of column containing the struct or array of the structs
     :param options: options to control converting. accepts the same options as 
the json datasource
 
     >>> from pyspark.sql import Row
@@ -1814,6 +1815,10 @@ def to_json(col, options={}):
     >>> df = spark.createDataFrame(data, ("key", "value"))
     >>> df.select(to_json(df.value).alias("json")).collect()
     [Row(json=u'{"age":2,"name":"Alice"}')]
+    >>> data = [(1, [Row(name='Alice', age=2), Row(name='Bob', age=3)])]
+    >>> df = spark.createDataFrame(data, ("key", "value"))
+    >>> df.select(to_json(df.value).alias("json")).collect()
+    [Row(json=u'[{"age":2,"name":"Alice"},{"age":3,"name":"Bob"}]')]
     """
 
     sc = SparkContext._active_spark_context

http://git-wip-us.apache.org/repos/asf/spark/blob/0cdcf911/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 0486e67..e1d83a8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -425,8 +425,8 @@ object FunctionRegistry {
     expression[BitwiseXor]("^"),
 
     // json
-    expression[StructToJson]("to_json"),
-    expression[JsonToStruct]("from_json"),
+    expression[StructsToJson]("to_json"),
+    expression[JsonToStructs]("from_json"),
 
     // Cast aliases (SPARK-16730)
     castAlias("boolean", BooleanType),

http://git-wip-us.apache.org/repos/asf/spark/blob/0cdcf911/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index 37e4bb5..e4e08a8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -29,7 +29,7 @@ import 
org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.json._
-import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, 
GenericArrayData, ParseModes}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, 
GenericArrayData, ParseModes}
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
@@ -482,7 +482,8 @@ case class JsonTuple(children: Seq[Expression])
 }
 
 /**
- * Converts an json input string to a [[StructType]] or [[ArrayType]] with the 
specified schema.
+ * Converts an json input string to a [[StructType]] or [[ArrayType]] of 
[[StructType]]s
+ * with the specified schema.
  */
 // scalastyle:off line.size.limit
 @ExpressionDescription(
@@ -495,7 +496,7 @@ case class JsonTuple(children: Seq[Expression])
        {"time":"2015-08-26 00:00:00.0"}
   """)
 // scalastyle:on line.size.limit
-case class JsonToStruct(
+case class JsonToStructs(
     schema: DataType,
     options: Map[String, String],
     child: Expression,
@@ -590,7 +591,7 @@ case class JsonToStruct(
 }
 
 /**
- * Converts a [[StructType]] to a json output string.
+ * Converts a [[StructType]] or [[ArrayType]] of [[StructType]]s to a json 
output string.
  */
 // scalastyle:off line.size.limit
 @ExpressionDescription(
@@ -601,9 +602,11 @@ case class JsonToStruct(
        {"a":1,"b":2}
       > SELECT _FUNC_(named_struct('time', to_timestamp('2015-08-26', 
'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy'));
        {"time":"26/08/2015"}
+      > SELECT _FUNC_(array(named_struct('a', 1, 'b', 2));
+       [{"a":1,"b":2}]
   """)
 // scalastyle:on line.size.limit
-case class StructToJson(
+case class StructsToJson(
     options: Map[String, String],
     child: Expression,
     timeZoneId: Option[String] = None)
@@ -624,41 +627,58 @@ case class StructToJson(
   lazy val writer = new CharArrayWriter()
 
   @transient
-  lazy val gen =
-    new JacksonGenerator(
-      child.dataType.asInstanceOf[StructType],
-      writer,
-      new JSONOptions(options, timeZoneId.get))
+  lazy val gen = new JacksonGenerator(
+    rowSchema, writer, new JSONOptions(options, timeZoneId.get))
+
+  @transient
+  lazy val rowSchema = child.dataType match {
+    case st: StructType => st
+    case ArrayType(st: StructType, _) => st
+  }
+
+  // This converts rows to the JSON output according to the given schema.
+  @transient
+  lazy val converter: Any => UTF8String = {
+    def getAndReset(): UTF8String = {
+      gen.flush()
+      val json = writer.toString
+      writer.reset()
+      UTF8String.fromString(json)
+    }
+
+    child.dataType match {
+      case _: StructType =>
+        (row: Any) =>
+          gen.write(row.asInstanceOf[InternalRow])
+          getAndReset()
+      case ArrayType(_: StructType, _) =>
+        (arr: Any) =>
+          gen.write(arr.asInstanceOf[ArrayData])
+          getAndReset()
+    }
+  }
 
   override def dataType: DataType = StringType
 
-  override def checkInputDataTypes(): TypeCheckResult = {
-    if (StructType.acceptsType(child.dataType)) {
+  override def checkInputDataTypes(): TypeCheckResult = child.dataType match {
+    case _: StructType | ArrayType(_: StructType, _) =>
       try {
-        JacksonUtils.verifySchema(child.dataType.asInstanceOf[StructType])
+        JacksonUtils.verifySchema(rowSchema)
         TypeCheckResult.TypeCheckSuccess
       } catch {
         case e: UnsupportedOperationException =>
           TypeCheckResult.TypeCheckFailure(e.getMessage)
       }
-    } else {
-      TypeCheckResult.TypeCheckFailure(
-        s"$prettyName requires that the expression is a struct expression.")
-    }
+    case _ => TypeCheckResult.TypeCheckFailure(
+      s"Input type ${child.dataType.simpleString} must be a struct or array of 
structs.")
   }
 
   override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
     copy(timeZoneId = Option(timeZoneId))
 
-  override def nullSafeEval(row: Any): Any = {
-    gen.write(row.asInstanceOf[InternalRow])
-    gen.flush()
-    val json = writer.toString
-    writer.reset()
-    UTF8String.fromString(json)
-  }
+  override def nullSafeEval(value: Any): Any = converter(value)
 
-  override def inputTypes: Seq[AbstractDataType] = StructType :: Nil
+  override def inputTypes: Seq[AbstractDataType] = TypeCollection(ArrayType, 
StructType) :: Nil
 }
 
 object JsonExprUtils {

http://git-wip-us.apache.org/repos/asf/spark/blob/0cdcf911/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
index dec5527..1d302ae 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
@@ -37,6 +37,10 @@ private[sql] class JacksonGenerator(
 
   // `ValueWriter`s for all fields of the schema
   private val rootFieldWriters: Array[ValueWriter] = 
schema.map(_.dataType).map(makeWriter).toArray
+  // `ValueWriter` for array data storing rows of the schema.
+  private val arrElementWriter: ValueWriter = (arr: SpecializedGetters, i: 
Int) => {
+    writeObject(writeFields(arr.getStruct(i, schema.length), schema, 
rootFieldWriters))
+  }
 
   private val gen = new 
JsonFactory().createGenerator(writer).setRootValueSeparator(null)
 
@@ -185,17 +189,18 @@ private[sql] class JacksonGenerator(
   def flush(): Unit = gen.flush()
 
   /**
-   * Transforms a single InternalRow to JSON using Jackson
+   * Transforms a single `InternalRow` to JSON object using Jackson
    *
    * @param row The row to convert
    */
-  def write(row: InternalRow): Unit = {
-    writeObject {
-      writeFields(row, schema, rootFieldWriters)
-    }
-  }
+  def write(row: InternalRow): Unit = writeObject(writeFields(row, schema, 
rootFieldWriters))
 
-  def writeLineEnding(): Unit = {
-    gen.writeRaw('\n')
-  }
+  /**
+   * Transforms multiple `InternalRow`s to JSON array using Jackson
+   *
+   * @param array The array of rows to convert
+   */
+  def write(array: ArrayData): Unit = writeArray(writeArrayData(array, 
arrElementWriter))
+
+  def writeLineEnding(): Unit = gen.writeRaw('\n')
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0cdcf911/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index 19d0c8e..e4698d4 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
@@ -21,7 +21,7 @@ import java.util.Calendar
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, 
ParseModes}
+import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, 
GenericArrayData, ParseModes}
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -352,7 +352,7 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
     val jsonData = """{"a": 1}"""
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     checkEvaluation(
-      JsonToStruct(schema, Map.empty, Literal(jsonData), gmtId),
+      JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId),
       InternalRow(1)
     )
   }
@@ -361,13 +361,13 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
     val jsonData = """{"a" 1}"""
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     checkEvaluation(
-      JsonToStruct(schema, Map.empty, Literal(jsonData), gmtId),
+      JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId),
       null
     )
 
     // Other modes should still return `null`.
     checkEvaluation(
-      JsonToStruct(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), 
Literal(jsonData), gmtId),
+      JsonToStructs(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), 
Literal(jsonData), gmtId),
       null
     )
   }
@@ -376,62 +376,62 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
     val input = """[{"a": 1}, {"a": 2}]"""
     val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
     val output = InternalRow(1) :: InternalRow(2) :: Nil
-    checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), 
output)
+    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), 
output)
   }
 
   test("from_json - input=object, schema=array, output=array of single row") {
     val input = """{"a": 1}"""
     val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
     val output = InternalRow(1) :: Nil
-    checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), 
output)
+    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), 
output)
   }
 
   test("from_json - input=empty array, schema=array, output=empty array") {
     val input = "[ ]"
     val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
     val output = Nil
-    checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), 
output)
+    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), 
output)
   }
 
   test("from_json - input=empty object, schema=array, output=array of single 
row with null") {
     val input = "{ }"
     val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
     val output = InternalRow(null) :: Nil
-    checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), 
output)
+    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), 
output)
   }
 
   test("from_json - input=array of single object, schema=struct, output=single 
row") {
     val input = """[{"a": 1}]"""
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     val output = InternalRow(1)
-    checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), 
output)
+    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), 
output)
   }
 
   test("from_json - input=array, schema=struct, output=null") {
     val input = """[{"a": 1}, {"a": 2}]"""
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     val output = null
-    checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), 
output)
+    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), 
output)
   }
 
   test("from_json - input=empty array, schema=struct, output=null") {
     val input = """[]"""
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     val output = null
-    checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), 
output)
+    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), 
output)
   }
 
   test("from_json - input=empty object, schema=struct, output=single row with 
null") {
     val input = """{  }"""
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     val output = InternalRow(null)
-    checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), 
output)
+    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), 
output)
   }
 
   test("from_json null input column") {
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     checkEvaluation(
-      JsonToStruct(schema, Map.empty, Literal.create(null, StringType), gmtId),
+      JsonToStructs(schema, Map.empty, Literal.create(null, StringType), 
gmtId),
       null
     )
   }
@@ -444,14 +444,14 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
     c.set(2016, 0, 1, 0, 0, 0)
     c.set(Calendar.MILLISECOND, 123)
     checkEvaluation(
-      JsonToStruct(schema, Map.empty, Literal(jsonData1), gmtId),
+      JsonToStructs(schema, Map.empty, Literal(jsonData1), gmtId),
       InternalRow(c.getTimeInMillis * 1000L)
     )
     // The result doesn't change because the json string includes timezone 
string ("Z" here),
     // which means the string represents the timestamp string in the timezone 
regardless of
     // the timeZoneId parameter.
     checkEvaluation(
-      JsonToStruct(schema, Map.empty, Literal(jsonData1), Option("PST")),
+      JsonToStructs(schema, Map.empty, Literal(jsonData1), Option("PST")),
       InternalRow(c.getTimeInMillis * 1000L)
     )
 
@@ -461,7 +461,7 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
       c.set(2016, 0, 1, 0, 0, 0)
       c.set(Calendar.MILLISECOND, 0)
       checkEvaluation(
-        JsonToStruct(
+        JsonToStructs(
           schema,
           Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss"),
           Literal(jsonData2),
@@ -469,7 +469,7 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
         InternalRow(c.getTimeInMillis * 1000L)
       )
       checkEvaluation(
-        JsonToStruct(
+        JsonToStructs(
           schema,
           Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss",
             DateTimeUtils.TIMEZONE_OPTION -> tz.getID),
@@ -483,25 +483,52 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
   test("SPARK-19543: from_json empty input column") {
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     checkEvaluation(
-      JsonToStruct(schema, Map.empty, Literal.create(" ", StringType), gmtId),
+      JsonToStructs(schema, Map.empty, Literal.create(" ", StringType), gmtId),
       null
     )
   }
 
-  test("to_json") {
+  test("to_json - struct") {
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     val struct = Literal.create(create_row(1), schema)
     checkEvaluation(
-      StructToJson(Map.empty, struct, gmtId),
+      StructsToJson(Map.empty, struct, gmtId),
       """{"a":1}"""
     )
   }
 
+  test("to_json - array") {
+    val inputSchema = ArrayType(StructType(StructField("a", IntegerType) :: 
Nil))
+    val input = new GenericArrayData(InternalRow(1) :: InternalRow(2) :: Nil)
+    val output = """[{"a":1},{"a":2}]"""
+    checkEvaluation(
+      StructsToJson(Map.empty, Literal.create(input, inputSchema), gmtId),
+      output)
+  }
+
+  test("to_json - array with single empty row") {
+    val inputSchema = ArrayType(StructType(StructField("a", IntegerType) :: 
Nil))
+    val input = new GenericArrayData(InternalRow(null) :: Nil)
+    val output = """[{}]"""
+    checkEvaluation(
+      StructsToJson(Map.empty, Literal.create(input, inputSchema), gmtId),
+      output)
+  }
+
+  test("to_json - empty array") {
+    val inputSchema = ArrayType(StructType(StructField("a", IntegerType) :: 
Nil))
+    val input = new GenericArrayData(Nil)
+    val output = """[]"""
+    checkEvaluation(
+      StructsToJson(Map.empty, Literal.create(input, inputSchema), gmtId),
+      output)
+  }
+
   test("to_json null input column") {
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     val struct = Literal.create(null, schema)
     checkEvaluation(
-      StructToJson(Map.empty, struct, gmtId),
+      StructsToJson(Map.empty, struct, gmtId),
       null
     )
   }
@@ -514,16 +541,16 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
     val struct = Literal.create(create_row(c.getTimeInMillis * 1000L), schema)
 
     checkEvaluation(
-      StructToJson(Map.empty, struct, gmtId),
+      StructsToJson(Map.empty, struct, gmtId),
       """{"t":"2016-01-01T00:00:00.000Z"}"""
     )
     checkEvaluation(
-      StructToJson(Map.empty, struct, Option("PST")),
+      StructsToJson(Map.empty, struct, Option("PST")),
       """{"t":"2015-12-31T16:00:00.000-08:00"}"""
     )
 
     checkEvaluation(
-      StructToJson(
+      StructsToJson(
         Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss",
           DateTimeUtils.TIMEZONE_OPTION -> gmtId.get),
         struct,
@@ -531,7 +558,7 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
       """{"t":"2016-01-01T00:00:00"}"""
     )
     checkEvaluation(
-      StructToJson(
+      StructsToJson(
         Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss",
           DateTimeUtils.TIMEZONE_OPTION -> "PST"),
         struct,

http://git-wip-us.apache.org/repos/asf/spark/blob/0cdcf911/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 201f726..a9f089c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -2978,7 +2978,8 @@ object functions {
 
   /**
    * (Scala-specific) Parses a column containing a JSON string into a 
`StructType` or `ArrayType`
-   * with the specified schema. Returns `null`, in the case of an unparseable 
string.
+   * of `StructType`s with the specified schema. Returns `null`, in the case 
of an unparseable
+   * string.
    *
    * @param e a string column containing JSON data.
    * @param schema the schema to use when parsing the json string
@@ -2989,7 +2990,7 @@ object functions {
    * @since 2.2.0
    */
   def from_json(e: Column, schema: DataType, options: Map[String, String]): 
Column = withExpr {
-    JsonToStruct(schema, options, e.expr)
+    JsonToStructs(schema, options, e.expr)
   }
 
   /**
@@ -3009,7 +3010,8 @@ object functions {
 
   /**
    * (Java-specific) Parses a column containing a JSON string into a 
`StructType` or `ArrayType`
-   * with the specified schema. Returns `null`, in the case of an unparseable 
string.
+   * of `StructType`s with the specified schema. Returns `null`, in the case 
of an unparseable
+   * string.
    *
    * @param e a string column containing JSON data.
    * @param schema the schema to use when parsing the json string
@@ -3036,7 +3038,7 @@ object functions {
     from_json(e, schema, Map.empty[String, String])
 
   /**
-   * Parses a column containing a JSON string into a `StructType` or 
`ArrayType`
+   * Parses a column containing a JSON string into a `StructType` or 
`ArrayType` of `StructType`s
    * with the specified schema. Returns `null`, in the case of an unparseable 
string.
    *
    * @param e a string column containing JSON data.
@@ -3049,7 +3051,7 @@ object functions {
     from_json(e, schema, Map.empty[String, String])
 
   /**
-   * Parses a column containing a JSON string into a `StructType` or 
`ArrayType`
+   * Parses a column containing a JSON string into a `StructType` or 
`ArrayType` of `StructType`s
    * with the specified schema. Returns `null`, in the case of an unparseable 
string.
    *
    * @param e a string column containing JSON data.
@@ -3062,10 +3064,11 @@ object functions {
     from_json(e, DataType.fromJson(schema), options)
 
   /**
-   * (Scala-specific) Converts a column containing a `StructType` into a JSON 
string with the
-   * specified schema. Throws an exception, in the case of an unsupported type.
+   * (Scala-specific) Converts a column containing a `StructType` or 
`ArrayType` of `StructType`s
+   * into a JSON string with the specified schema. Throws an exception, in the 
case of an
+   * unsupported type.
    *
-   * @param e a struct column.
+   * @param e a column containing a struct or array of the structs.
    * @param options options to control how the struct column is converted into 
a json string.
    *                accepts the same options and the json data source.
    *
@@ -3073,14 +3076,15 @@ object functions {
    * @since 2.1.0
    */
   def to_json(e: Column, options: Map[String, String]): Column = withExpr {
-    StructToJson(options, e.expr)
+    StructsToJson(options, e.expr)
   }
 
   /**
-   * (Java-specific) Converts a column containing a `StructType` into a JSON 
string with the
-   * specified schema. Throws an exception, in the case of an unsupported type.
+   * (Java-specific) Converts a column containing a `StructType` or 
`ArrayType` of `StructType`s
+   * into a JSON string with the specified schema. Throws an exception, in the 
case of an
+   * unsupported type.
    *
-   * @param e a struct column.
+   * @param e a column containing a struct or array of the structs.
    * @param options options to control how the struct column is converted into 
a json string.
    *                accepts the same options and the json data source.
    *
@@ -3091,10 +3095,10 @@ object functions {
     to_json(e, options.asScala.toMap)
 
   /**
-   * Converts a column containing a `StructType` into a JSON string with the
-   * specified schema. Throws an exception, in the case of an unsupported type.
+   * Converts a column containing a `StructType` or `ArrayType` of 
`StructType`s into a JSON string
+   * with the specified schema. Throws an exception, in the case of an 
unsupported type.
    *
-   * @param e a struct column.
+   * @param e a column containing a struct or array of the structs.
    *
    * @group collection_funcs
    * @since 2.1.0

http://git-wip-us.apache.org/repos/asf/spark/blob/0cdcf911/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql 
b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql
index 83243c5..b3cc2ce 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql
@@ -3,6 +3,7 @@ describe function to_json;
 describe function extended to_json;
 select to_json(named_struct('a', 1, 'b', 2));
 select to_json(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), 
map('timestampFormat', 'dd/MM/yyyy'));
+select to_json(array(named_struct('a', 1, 'b', 2)));
 -- Check if errors handled
 select to_json(named_struct('a', 1, 'b', 2), named_struct('mode', 
'PERMISSIVE'));
 select to_json(named_struct('a', 1, 'b', 2), map('mode', 1));

http://git-wip-us.apache.org/repos/asf/spark/blob/0cdcf911/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out 
b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
index b57cbbc..315e173 100644
--- a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
@@ -1,5 +1,5 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 16
+-- Number of queries: 17
 
 
 -- !query 0
@@ -7,7 +7,7 @@ describe function to_json
 -- !query 0 schema
 struct<function_desc:string>
 -- !query 0 output
-Class: org.apache.spark.sql.catalyst.expressions.StructToJson
+Class: org.apache.spark.sql.catalyst.expressions.StructsToJson
 Function: to_json
 Usage: to_json(expr[, options]) - Returns a json string with a given struct 
value
 
@@ -17,13 +17,15 @@ describe function extended to_json
 -- !query 1 schema
 struct<function_desc:string>
 -- !query 1 output
-Class: org.apache.spark.sql.catalyst.expressions.StructToJson
+Class: org.apache.spark.sql.catalyst.expressions.StructsToJson
 Extended Usage:
     Examples:
       > SELECT to_json(named_struct('a', 1, 'b', 2));
        {"a":1,"b":2}
       > SELECT to_json(named_struct('time', to_timestamp('2015-08-26', 
'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy'));
        {"time":"26/08/2015"}
+      > SELECT to_json(array(named_struct('a', 1, 'b', 2));
+       [{"a":1,"b":2}]
   
 Function: to_json
 Usage: to_json(expr[, options]) - Returns a json string with a given struct 
value
@@ -32,7 +34,7 @@ Usage: to_json(expr[, options]) - Returns a json string with 
a given struct valu
 -- !query 2
 select to_json(named_struct('a', 1, 'b', 2))
 -- !query 2 schema
-struct<structtojson(named_struct(a, 1, b, 2)):string>
+struct<structstojson(named_struct(a, 1, b, 2)):string>
 -- !query 2 output
 {"a":1,"b":2}
 
@@ -40,54 +42,62 @@ struct<structtojson(named_struct(a, 1, b, 2)):string>
 -- !query 3
 select to_json(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), 
map('timestampFormat', 'dd/MM/yyyy'))
 -- !query 3 schema
-struct<structtojson(named_struct(time, to_timestamp('2015-08-26', 
'yyyy-MM-dd'))):string>
+struct<structstojson(named_struct(time, to_timestamp('2015-08-26', 
'yyyy-MM-dd'))):string>
 -- !query 3 output
 {"time":"26/08/2015"}
 
 
 -- !query 4
-select to_json(named_struct('a', 1, 'b', 2), named_struct('mode', 
'PERMISSIVE'))
+select to_json(array(named_struct('a', 1, 'b', 2)))
 -- !query 4 schema
-struct<>
+struct<structstojson(array(named_struct(a, 1, b, 2))):string>
 -- !query 4 output
-org.apache.spark.sql.AnalysisException
-Must use a map() function for options;; line 1 pos 7
+[{"a":1,"b":2}]
 
 
 -- !query 5
-select to_json(named_struct('a', 1, 'b', 2), map('mode', 1))
+select to_json(named_struct('a', 1, 'b', 2), named_struct('mode', 
'PERMISSIVE'))
 -- !query 5 schema
 struct<>
 -- !query 5 output
 org.apache.spark.sql.AnalysisException
-A type of keys and values in map() must be string, but got 
MapType(StringType,IntegerType,false);; line 1 pos 7
+Must use a map() function for options;; line 1 pos 7
 
 
 -- !query 6
-select to_json()
+select to_json(named_struct('a', 1, 'b', 2), map('mode', 1))
 -- !query 6 schema
 struct<>
 -- !query 6 output
 org.apache.spark.sql.AnalysisException
-Invalid number of arguments for function to_json; line 1 pos 7
+A type of keys and values in map() must be string, but got 
MapType(StringType,IntegerType,false);; line 1 pos 7
 
 
 -- !query 7
-describe function from_json
+select to_json()
 -- !query 7 schema
-struct<function_desc:string>
+struct<>
 -- !query 7 output
-Class: org.apache.spark.sql.catalyst.expressions.JsonToStruct
-Function: from_json
-Usage: from_json(jsonStr, schema[, options]) - Returns a struct value with the 
given `jsonStr` and `schema`.
+org.apache.spark.sql.AnalysisException
+Invalid number of arguments for function to_json; line 1 pos 7
 
 
 -- !query 8
-describe function extended from_json
+describe function from_json
 -- !query 8 schema
 struct<function_desc:string>
 -- !query 8 output
-Class: org.apache.spark.sql.catalyst.expressions.JsonToStruct
+Class: org.apache.spark.sql.catalyst.expressions.JsonToStructs
+Function: from_json
+Usage: from_json(jsonStr, schema[, options]) - Returns a struct value with the 
given `jsonStr` and `schema`.
+
+
+-- !query 9
+describe function extended from_json
+-- !query 9 schema
+struct<function_desc:string>
+-- !query 9 output
+Class: org.apache.spark.sql.catalyst.expressions.JsonToStructs
 Extended Usage:
     Examples:
       > SELECT from_json('{"a":1, "b":0.8}', 'a INT, b DOUBLE');
@@ -99,36 +109,36 @@ Function: from_json
 Usage: from_json(jsonStr, schema[, options]) - Returns a struct value with the 
given `jsonStr` and `schema`.
 
 
--- !query 9
+-- !query 10
 select from_json('{"a":1}', 'a INT')
--- !query 9 schema
-struct<jsontostruct({"a":1}):struct<a:int>>
--- !query 9 output
+-- !query 10 schema
+struct<jsontostructs({"a":1}):struct<a:int>>
+-- !query 10 output
 {"a":1}
 
 
--- !query 10
+-- !query 11
 select from_json('{"time":"26/08/2015"}', 'time Timestamp', 
map('timestampFormat', 'dd/MM/yyyy'))
--- !query 10 schema
-struct<jsontostruct({"time":"26/08/2015"}):struct<time:timestamp>>
--- !query 10 output
+-- !query 11 schema
+struct<jsontostructs({"time":"26/08/2015"}):struct<time:timestamp>>
+-- !query 11 output
 {"time":2015-08-26 00:00:00.0}
 
 
--- !query 11
+-- !query 12
 select from_json('{"a":1}', 1)
--- !query 11 schema
+-- !query 12 schema
 struct<>
--- !query 11 output
+-- !query 12 output
 org.apache.spark.sql.AnalysisException
 Expected a string literal instead of 1;; line 1 pos 7
 
 
--- !query 12
+-- !query 13
 select from_json('{"a":1}', 'a InvalidType')
--- !query 12 schema
+-- !query 13 schema
 struct<>
--- !query 12 output
+-- !query 13 output
 org.apache.spark.sql.AnalysisException
 
 DataType invalidtype() is not supported.(line 1, pos 2)
@@ -139,28 +149,28 @@ a InvalidType
 ; line 1 pos 7
 
 
--- !query 13
+-- !query 14
 select from_json('{"a":1}', 'a INT', named_struct('mode', 'PERMISSIVE'))
--- !query 13 schema
+-- !query 14 schema
 struct<>
--- !query 13 output
+-- !query 14 output
 org.apache.spark.sql.AnalysisException
 Must use a map() function for options;; line 1 pos 7
 
 
--- !query 14
+-- !query 15
 select from_json('{"a":1}', 'a INT', map('mode', 1))
--- !query 14 schema
+-- !query 15 schema
 struct<>
--- !query 14 output
+-- !query 15 output
 org.apache.spark.sql.AnalysisException
 A type of keys and values in map() must be string, but got 
MapType(StringType,IntegerType,false);; line 1 pos 7
 
 
--- !query 15
+-- !query 16
 select from_json()
--- !query 15 schema
+-- !query 16 schema
 struct<>
--- !query 15 output
+-- !query 16 output
 org.apache.spark.sql.AnalysisException
 Invalid number of arguments for function from_json; line 1 pos 7

http://git-wip-us.apache.org/repos/asf/spark/blob/0cdcf911/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index 2345b82..170c238 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -156,7 +156,7 @@ class JsonFunctionsSuite extends QueryTest with 
SharedSQLContext {
       Row(Seq(Row(1, "a"), Row(2, null), Row(null, null))))
   }
 
-  test("to_json") {
+  test("to_json - struct") {
     val df = Seq(Tuple1(Tuple1(1))).toDF("a")
 
     checkAnswer(
@@ -164,6 +164,14 @@ class JsonFunctionsSuite extends QueryTest with 
SharedSQLContext {
       Row("""{"_1":1}""") :: Nil)
   }
 
+  test("to_json - array") {
+    val df = Seq(Tuple1(Tuple1(1) :: Nil)).toDF("a")
+
+    checkAnswer(
+      df.select(to_json($"a")),
+      Row("""[{"_1":1}]""") :: Nil)
+  }
+
   test("to_json with option") {
     val df = Seq(Tuple1(Tuple1(java.sql.Timestamp.valueOf("2015-08-26 
18:00:00.0")))).toDF("a")
     val options = Map("timestampFormat" -> "dd/MM/yyyy HH:mm")
@@ -184,7 +192,7 @@ class JsonFunctionsSuite extends QueryTest with 
SharedSQLContext {
       "Unable to convert column a of type calendarinterval to JSON."))
   }
 
-  test("roundtrip in to_json and from_json") {
+  test("roundtrip in to_json and from_json - struct") {
     val dfOne = Seq(Tuple1(Tuple1(1)), Tuple1(null)).toDF("struct")
     val schemaOne = dfOne.schema(0).dataType.asInstanceOf[StructType]
     val readBackOne = dfOne.select(to_json($"struct").as("json"))
@@ -198,6 +206,20 @@ class JsonFunctionsSuite extends QueryTest with 
SharedSQLContext {
     checkAnswer(dfTwo, readBackTwo)
   }
 
+  test("roundtrip in to_json and from_json - array") {
+    val dfOne = Seq(Tuple1(Tuple1(1) :: Nil), Tuple1(null :: 
Nil)).toDF("array")
+    val schemaOne = dfOne.schema(0).dataType
+    val readBackOne = dfOne.select(to_json($"array").as("json"))
+      .select(from_json($"json", schemaOne).as("array"))
+    checkAnswer(dfOne, readBackOne)
+
+    val dfTwo = Seq(Some("""[{"a":1}]"""), None).toDF("json")
+    val schemaTwo = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
+    val readBackTwo = dfTwo.select(from_json($"json", schemaTwo).as("array"))
+      .select(to_json($"array").as("json"))
+    checkAnswer(dfTwo, readBackTwo)
+  }
+
   test("SPARK-19637 Support to_json in SQL") {
     val df1 = Seq(Tuple1(Tuple1(1))).toDF("a")
     checkAnswer(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to