Repository: spark Updated Branches: refs/heads/master 1a9857476 -> 371e4e205
[SPARK-21513][SQL] Allow UDF to_json support converting MapType to json # What changes were proposed in this pull request? UDF to_json only supports converting `StructType` or `ArrayType` of `StructType`s to a json output string now. According to the discussion of JIRA SPARK-21513, I allow to `to_json` support converting `MapType` and `ArrayType` of `MapType`s to a json output string. This PR is for SQL and Scala API only. # How was this patch tested? Adding unit test case. cc viirya HyukjinKwon Author: goldmedal <liugs...@gmail.com> Author: Jia-Xuan Liu <liugs...@gmail.com> Closes #18875 from goldmedal/SPARK-21513. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/371e4e20 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/371e4e20 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/371e4e20 Branch: refs/heads/master Commit: 371e4e2053eb7535a27dd71756a3a479aae22306 Parents: 1a98574 Author: goldmedal <liugs...@gmail.com> Authored: Wed Sep 13 09:43:00 2017 +0900 Committer: hyukjinkwon <gurwls...@gmail.com> Committed: Wed Sep 13 09:43:00 2017 +0900 ---------------------------------------------------------------------- .../catalyst/expressions/jsonExpressions.scala | 38 ++++- .../sql/catalyst/json/JacksonGenerator.scala | 65 +++++++-- .../expressions/JsonExpressionsSuite.scala | 49 ++++++- .../catalyst/json/JacksonGeneratorSuite.scala | 113 +++++++++++++++ .../scala/org/apache/spark/sql/functions.scala | 17 +-- .../sql-tests/inputs/json-functions.sql | 5 + .../sql-tests/results/json-functions.sql.out | 144 ++++++++++++------- .../apache/spark/sql/JsonFunctionsSuite.scala | 16 +++ 8 files changed, 378 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/371e4e20/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index ee5da1a..1341631 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.json._ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser -import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, BadRecordException, FailFastMode, GenericArrayData} +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, BadRecordException, FailFastMode, GenericArrayData, MapData} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils @@ -604,7 +604,8 @@ case class JsonToStructs( } /** - * Converts a [[StructType]] or [[ArrayType]] of [[StructType]]s to a json output string. + * Converts a [[StructType]], [[ArrayType]] of [[StructType]]s, [[MapType]] + * or [[ArrayType]] of [[MapType]]s to a json output string. */ // scalastyle:off line.size.limit @ExpressionDescription( @@ -617,6 +618,14 @@ case class JsonToStructs( {"time":"26/08/2015"} > SELECT _FUNC_(array(named_struct('a', 1, 'b', 2)); [{"a":1,"b":2}] + > SELECT _FUNC_(map('a',named_struct('b',1))); + {"a":{"b":1}} + > SELECT _FUNC_(map(named_struct('a',1),named_struct('b',2))); + {"[1]":{"b":2}} + > SELECT _FUNC_(map('a',1)); + {"a":1} + > SELECT _FUNC_(array((map('a',1)))); + [{"a":1}] """, since = "2.2.0") // scalastyle:on line.size.limit @@ -648,6 +657,8 @@ case class StructsToJson( lazy val rowSchema = child.dataType match { case st: StructType => st case ArrayType(st: StructType, _) => st + case mt: MapType => mt + case ArrayType(mt: MapType, _) => mt } // This converts rows to the JSON output according to the given schema. @@ -669,6 +680,14 @@ case class StructsToJson( (arr: Any) => gen.write(arr.asInstanceOf[ArrayData]) getAndReset() + case _: MapType => + (map: Any) => + gen.write(map.asInstanceOf[MapData]) + getAndReset() + case ArrayType(_: MapType, _) => + (arr: Any) => + gen.write(arr.asInstanceOf[ArrayData]) + getAndReset() } } @@ -677,14 +696,25 @@ case class StructsToJson( override def checkInputDataTypes(): TypeCheckResult = child.dataType match { case _: StructType | ArrayType(_: StructType, _) => try { - JacksonUtils.verifySchema(rowSchema) + JacksonUtils.verifySchema(rowSchema.asInstanceOf[StructType]) + TypeCheckResult.TypeCheckSuccess + } catch { + case e: UnsupportedOperationException => + TypeCheckResult.TypeCheckFailure(e.getMessage) + } + case _: MapType | ArrayType(_: MapType, _) => + // TODO: let `JacksonUtils.verifySchema` verify a `MapType` + try { + val st = StructType(StructField("a", rowSchema.asInstanceOf[MapType]) :: Nil) + JacksonUtils.verifySchema(st) TypeCheckResult.TypeCheckSuccess } catch { case e: UnsupportedOperationException => TypeCheckResult.TypeCheckFailure(e.getMessage) } case _ => TypeCheckResult.TypeCheckFailure( - s"Input type ${child.dataType.simpleString} must be a struct or array of structs.") + s"Input type ${child.dataType.simpleString} must be a struct, array of structs or " + + "a map or array of map.") } override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = http://git-wip-us.apache.org/repos/asf/spark/blob/371e4e20/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala index 1d302ae..dfe7e28 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala @@ -26,8 +26,15 @@ import org.apache.spark.sql.catalyst.expressions.SpecializedGetters import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData} import org.apache.spark.sql.types._ +/** + * `JackGenerator` can only be initialized with a `StructType` or a `MapType`. + * Once it is initialized with `StructType`, it can be used to write out a struct or an array of + * struct. Once it is initialized with `MapType`, it can be used to write out a map or an array + * of map. An exception will be thrown if trying to write out a struct if it is initialized with + * a `MapType`, and vice verse. + */ private[sql] class JacksonGenerator( - schema: StructType, + dataType: DataType, writer: Writer, options: JSONOptions) { // A `ValueWriter` is responsible for writing a field of an `InternalRow` to appropriate @@ -35,11 +42,34 @@ private[sql] class JacksonGenerator( // we can directly access data in `ArrayData` without the help of `SpecificMutableRow`. private type ValueWriter = (SpecializedGetters, Int) => Unit + // `JackGenerator` can only be initialized with a `StructType` or a `MapType`. + require(dataType.isInstanceOf[StructType] | dataType.isInstanceOf[MapType], + "JacksonGenerator only supports to be initialized with a StructType " + + s"or MapType but got ${dataType.simpleString}") + // `ValueWriter`s for all fields of the schema - private val rootFieldWriters: Array[ValueWriter] = schema.map(_.dataType).map(makeWriter).toArray + private lazy val rootFieldWriters: Array[ValueWriter] = dataType match { + case st: StructType => st.map(_.dataType).map(makeWriter).toArray + case _ => throw new UnsupportedOperationException( + s"Initial type ${dataType.simpleString} must be a struct") + } + // `ValueWriter` for array data storing rows of the schema. - private val arrElementWriter: ValueWriter = (arr: SpecializedGetters, i: Int) => { - writeObject(writeFields(arr.getStruct(i, schema.length), schema, rootFieldWriters)) + private lazy val arrElementWriter: ValueWriter = dataType match { + case st: StructType => + (arr: SpecializedGetters, i: Int) => { + writeObject(writeFields(arr.getStruct(i, st.length), st, rootFieldWriters)) + } + case mt: MapType => + (arr: SpecializedGetters, i: Int) => { + writeObject(writeMapData(arr.getMap(i), mt, mapElementWriter)) + } + } + + private lazy val mapElementWriter: ValueWriter = dataType match { + case mt: MapType => makeWriter(mt.valueType) + case _ => throw new UnsupportedOperationException( + s"Initial type ${dataType.simpleString} must be a map") } private val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null) @@ -189,18 +219,37 @@ private[sql] class JacksonGenerator( def flush(): Unit = gen.flush() /** - * Transforms a single `InternalRow` to JSON object using Jackson + * Transforms a single `InternalRow` to JSON object using Jackson. + * This api calling will be validated through accessing `rootFieldWriters`. * * @param row The row to convert */ - def write(row: InternalRow): Unit = writeObject(writeFields(row, schema, rootFieldWriters)) + def write(row: InternalRow): Unit = { + writeObject(writeFields( + fieldWriters = rootFieldWriters, + row = row, + schema = dataType.asInstanceOf[StructType])) + } /** - * Transforms multiple `InternalRow`s to JSON array using Jackson + * Transforms multiple `InternalRow`s or `MapData`s to JSON array using Jackson * - * @param array The array of rows to convert + * @param array The array of rows or maps to convert */ def write(array: ArrayData): Unit = writeArray(writeArrayData(array, arrElementWriter)) + /** + * Transforms a single `MapData` to JSON object using Jackson + * This api calling will will be validated through accessing `mapElementWriter`. + * + * @param map a map to convert + */ + def write(map: MapData): Unit = { + writeObject(writeMapData( + fieldWriter = mapElementWriter, + map = map, + mapType = dataType.asInstanceOf[MapType])) + } + def writeLineEnding(): Unit = gen.writeRaw('\n') } http://git-wip-us.apache.org/repos/asf/spark/blob/371e4e20/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index 5de1143..a0bbe02 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -22,7 +22,7 @@ import java.util.Calendar import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.errors.TreeNodeException -import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, GenericArrayData, PermissiveMode} +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeTestUtils, DateTimeUtils, GenericArrayData, PermissiveMode} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -612,6 +612,53 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { ) } + test("SPARK-21513: to_json support map[string, struct] to json") { + val schema = MapType(StringType, StructType(StructField("a", IntegerType) :: Nil)) + val input = Literal.create(ArrayBasedMapData(Map("test" -> InternalRow(1))), schema) + checkEvaluation( + StructsToJson(Map.empty, input), + """{"test":{"a":1}}""" + ) + } + + test("SPARK-21513: to_json support map[struct, struct] to json") { + val schema = MapType(StructType(StructField("a", IntegerType) :: Nil), + StructType(StructField("b", IntegerType) :: Nil)) + val input = Literal.create(ArrayBasedMapData(Map(InternalRow(1) -> InternalRow(2))), schema) + checkEvaluation( + StructsToJson(Map.empty, input), + """{"[1]":{"b":2}}""" + ) + } + + test("SPARK-21513: to_json support map[string, integer] to json") { + val schema = MapType(StringType, IntegerType) + val input = Literal.create(ArrayBasedMapData(Map("a" -> 1)), schema) + checkEvaluation( + StructsToJson(Map.empty, input), + """{"a":1}""" + ) + } + + test("to_json - array with maps") { + val inputSchema = ArrayType(MapType(StringType, IntegerType)) + val input = new GenericArrayData(ArrayBasedMapData( + Map("a" -> 1)) :: ArrayBasedMapData(Map("b" -> 2)) :: Nil) + val output = """[{"a":1},{"b":2}]""" + checkEvaluation( + StructsToJson(Map.empty, Literal.create(input, inputSchema), gmtId), + output) + } + + test("to_json - array with single map") { + val inputSchema = ArrayType(MapType(StringType, IntegerType)) + val input = new GenericArrayData(ArrayBasedMapData(Map("a" -> 1)) :: Nil) + val output = """[{"a":1}]""" + checkEvaluation( + StructsToJson(Map.empty, Literal.create(input, inputSchema), gmtId), + output) + } + test("to_json: verify MapType's value type instead of key type") { // Keys in map are treated as strings when converting to JSON. The type doesn't matter at all. val mapType1 = MapType(CalendarIntervalType, IntegerType) http://git-wip-us.apache.org/repos/asf/spark/blob/371e4e20/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala new file mode 100644 index 0000000..9b27490 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.json + +import java.io.CharArrayWriter + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData} +import org.apache.spark.sql.types._ + +class JacksonGeneratorSuite extends SparkFunSuite { + + val gmtId = DateTimeUtils.TimeZoneGMT.getID + val option = new JSONOptions(Map.empty, gmtId) + + test("initial with StructType and write out a row") { + val dataType = StructType(StructField("a", IntegerType) :: Nil) + val input = InternalRow(1) + val writer = new CharArrayWriter() + val gen = new JacksonGenerator(dataType, writer, option) + gen.write(input) + gen.flush() + assert(writer.toString === """{"a":1}""") + } + + test("initial with StructType and write out rows") { + val dataType = StructType(StructField("a", IntegerType) :: Nil) + val input = new GenericArrayData(InternalRow(1) :: InternalRow(2) :: Nil) + val writer = new CharArrayWriter() + val gen = new JacksonGenerator(dataType, writer, option) + gen.write(input) + gen.flush() + assert(writer.toString === """[{"a":1},{"a":2}]""") + } + + test("initial with StructType and write out an array with single empty row") { + val dataType = StructType(StructField("a", IntegerType) :: Nil) + val input = new GenericArrayData(InternalRow(null) :: Nil) + val writer = new CharArrayWriter() + val gen = new JacksonGenerator(dataType, writer, option) + gen.write(input) + gen.flush() + assert(writer.toString === """[{}]""") + } + + test("initial with StructType and write out an empty array") { + val dataType = StructType(StructField("a", IntegerType) :: Nil) + val input = new GenericArrayData(Nil) + val writer = new CharArrayWriter() + val gen = new JacksonGenerator(dataType, writer, option) + gen.write(input) + gen.flush() + assert(writer.toString === """[]""") + } + + test("initial with Map and write out a map data") { + val dataType = MapType(StringType, IntegerType) + val input = ArrayBasedMapData(Map("a" -> 1)) + val writer = new CharArrayWriter() + val gen = new JacksonGenerator(dataType, writer, option) + gen.write(input) + gen.flush() + assert(writer.toString === """{"a":1}""") + } + + test("initial with Map and write out an array of maps") { + val dataType = MapType(StringType, IntegerType) + val input = new GenericArrayData( + ArrayBasedMapData(Map("a" -> 1)) :: ArrayBasedMapData(Map("b" -> 2)) :: Nil) + val writer = new CharArrayWriter() + val gen = new JacksonGenerator(dataType, writer, option) + gen.write(input) + gen.flush() + assert(writer.toString === """[{"a":1},{"b":2}]""") + } + + test("error handling: initial with StructType but error calling write a map") { + val dataType = StructType(StructField("a", IntegerType) :: Nil) + val input = ArrayBasedMapData(Map("a" -> 1)) + val writer = new CharArrayWriter() + val gen = new JacksonGenerator(dataType, writer, option) + intercept[UnsupportedOperationException] { + gen.write(input) + } + } + + test("error handling: initial with MapType and write out a row") { + val dataType = MapType(StringType, IntegerType) + val input = InternalRow(1) + val writer = new CharArrayWriter() + val gen = new JacksonGenerator(dataType, writer, option) + intercept[UnsupportedOperationException] { + gen.write(input) + } + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/371e4e20/sql/core/src/main/scala/org/apache/spark/sql/functions.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 53b2552..47324ed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -3119,9 +3119,9 @@ object functions { } /** - * (Scala-specific) Converts a column containing a `StructType` or `ArrayType` of `StructType`s - * into a JSON string with the specified schema. Throws an exception, in the case of an - * unsupported type. + * (Scala-specific) Converts a column containing a `StructType`, `ArrayType` of `StructType`s, + * a `MapType` or `ArrayType` of `MapType`s into a JSON string with the specified schema. + * Throws an exception, in the case of an unsupported type. * * @param e a column containing a struct or array of the structs. * @param options options to control how the struct column is converted into a json string. @@ -3135,9 +3135,9 @@ object functions { } /** - * (Java-specific) Converts a column containing a `StructType` or `ArrayType` of `StructType`s - * into a JSON string with the specified schema. Throws an exception, in the case of an - * unsupported type. + * (Java-specific) Converts a column containing a `StructType`, `ArrayType` of `StructType`s, + * a `MapType` or `ArrayType` of `MapType`s into a JSON string with the specified schema. + * Throws an exception, in the case of an unsupported type. * * @param e a column containing a struct or array of the structs. * @param options options to control how the struct column is converted into a json string. @@ -3150,8 +3150,9 @@ object functions { to_json(e, options.asScala.toMap) /** - * Converts a column containing a `StructType` or `ArrayType` of `StructType`s into a JSON string - * with the specified schema. Throws an exception, in the case of an unsupported type. + * Converts a column containing a `StructType`, `ArrayType` of `StructType`s, + * a `MapType` or `ArrayType` of `MapType`s into a JSON string with the specified schema. + * Throws an exception, in the case of an unsupported type. * * @param e a column containing a struct or array of the structs. * http://git-wip-us.apache.org/repos/asf/spark/blob/371e4e20/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql ---------------------------------------------------------------------- diff --git a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql index 5a46fb4..fea069e 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql @@ -4,6 +4,11 @@ describe function extended to_json; select to_json(named_struct('a', 1, 'b', 2)); select to_json(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy')); select to_json(array(named_struct('a', 1, 'b', 2))); +select to_json(map(named_struct('a', 1, 'b', 2), named_struct('a', 1, 'b', 2))); +select to_json(map('a', named_struct('a', 1, 'b', 2))); +select to_json(map('a', 1)); +select to_json(array(map('a',1))); +select to_json(array(map('a',1), map('b',2))); -- Check if errors handled select to_json(named_struct('a', 1, 'b', 2), named_struct('mode', 'PERMISSIVE')); select to_json(named_struct('a', 1, 'b', 2), map('mode', 1)); http://git-wip-us.apache.org/repos/asf/spark/blob/371e4e20/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out ---------------------------------------------------------------------- diff --git a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out index ae21d00..dcced79 100644 --- a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 21 +-- Number of queries: 26 -- !query 0 @@ -26,6 +26,14 @@ Extended Usage: {"time":"26/08/2015"} > SELECT to_json(array(named_struct('a', 1, 'b', 2)); [{"a":1,"b":2}] + > SELECT to_json(map('a',named_struct('b',1))); + {"a":{"b":1}} + > SELECT to_json(map(named_struct('a',1),named_struct('b',2))); + {"[1]":{"b":2}} + > SELECT to_json(map('a',1)); + {"a":1} + > SELECT to_json(array((map('a',1)))); + [{"a":1}] Since: 2.2.0 @@ -58,47 +66,87 @@ struct<structstojson(array(named_struct(a, 1, b, 2))):string> -- !query 5 -select to_json(named_struct('a', 1, 'b', 2), named_struct('mode', 'PERMISSIVE')) +select to_json(map(named_struct('a', 1, 'b', 2), named_struct('a', 1, 'b', 2))) -- !query 5 schema -struct<> +struct<structstojson(map(named_struct(a, 1, b, 2), named_struct(a, 1, b, 2))):string> -- !query 5 output +{"[1,2]":{"a":1,"b":2}} + + +-- !query 6 +select to_json(map('a', named_struct('a', 1, 'b', 2))) +-- !query 6 schema +struct<structstojson(map(a, named_struct(a, 1, b, 2))):string> +-- !query 6 output +{"a":{"a":1,"b":2}} + + +-- !query 7 +select to_json(map('a', 1)) +-- !query 7 schema +struct<structstojson(map(a, 1)):string> +-- !query 7 output +{"a":1} + + +-- !query 8 +select to_json(array(map('a',1))) +-- !query 8 schema +struct<structstojson(array(map(a, 1))):string> +-- !query 8 output +[{"a":1}] + + +-- !query 9 +select to_json(array(map('a',1), map('b',2))) +-- !query 9 schema +struct<structstojson(array(map(a, 1), map(b, 2))):string> +-- !query 9 output +[{"a":1},{"b":2}] + + +-- !query 10 +select to_json(named_struct('a', 1, 'b', 2), named_struct('mode', 'PERMISSIVE')) +-- !query 10 schema +struct<> +-- !query 10 output org.apache.spark.sql.AnalysisException Must use a map() function for options;; line 1 pos 7 --- !query 6 +-- !query 11 select to_json(named_struct('a', 1, 'b', 2), map('mode', 1)) --- !query 6 schema +-- !query 11 schema struct<> --- !query 6 output +-- !query 11 output org.apache.spark.sql.AnalysisException A type of keys and values in map() must be string, but got MapType(StringType,IntegerType,false);; line 1 pos 7 --- !query 7 +-- !query 12 select to_json() --- !query 7 schema +-- !query 12 schema struct<> --- !query 7 output +-- !query 12 output org.apache.spark.sql.AnalysisException Invalid number of arguments for function to_json; line 1 pos 7 --- !query 8 +-- !query 13 describe function from_json --- !query 8 schema +-- !query 13 schema struct<function_desc:string> --- !query 8 output +-- !query 13 output Class: org.apache.spark.sql.catalyst.expressions.JsonToStructs Function: from_json Usage: from_json(jsonStr, schema[, options]) - Returns a struct value with the given `jsonStr` and `schema`. --- !query 9 +-- !query 14 describe function extended from_json --- !query 9 schema +-- !query 14 schema struct<function_desc:string> --- !query 9 output +-- !query 14 output Class: org.apache.spark.sql.catalyst.expressions.JsonToStructs Extended Usage: Examples: @@ -113,36 +161,36 @@ Function: from_json Usage: from_json(jsonStr, schema[, options]) - Returns a struct value with the given `jsonStr` and `schema`. --- !query 10 +-- !query 15 select from_json('{"a":1}', 'a INT') --- !query 10 schema +-- !query 15 schema struct<jsontostructs({"a":1}):struct<a:int>> --- !query 10 output +-- !query 15 output {"a":1} --- !query 11 +-- !query 16 select from_json('{"time":"26/08/2015"}', 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy')) --- !query 11 schema +-- !query 16 schema struct<jsontostructs({"time":"26/08/2015"}):struct<time:timestamp>> --- !query 11 output +-- !query 16 output {"time":2015-08-26 00:00:00.0} --- !query 12 +-- !query 17 select from_json('{"a":1}', 1) --- !query 12 schema +-- !query 17 schema struct<> --- !query 12 output +-- !query 17 output org.apache.spark.sql.AnalysisException Expected a string literal instead of 1;; line 1 pos 7 --- !query 13 +-- !query 18 select from_json('{"a":1}', 'a InvalidType') --- !query 13 schema +-- !query 18 schema struct<> --- !query 13 output +-- !query 18 output org.apache.spark.sql.AnalysisException DataType invalidtype is not supported.(line 1, pos 2) @@ -153,60 +201,60 @@ a InvalidType ; line 1 pos 7 --- !query 14 +-- !query 19 select from_json('{"a":1}', 'a INT', named_struct('mode', 'PERMISSIVE')) --- !query 14 schema +-- !query 19 schema struct<> --- !query 14 output +-- !query 19 output org.apache.spark.sql.AnalysisException Must use a map() function for options;; line 1 pos 7 --- !query 15 +-- !query 20 select from_json('{"a":1}', 'a INT', map('mode', 1)) --- !query 15 schema +-- !query 20 schema struct<> --- !query 15 output +-- !query 20 output org.apache.spark.sql.AnalysisException A type of keys and values in map() must be string, but got MapType(StringType,IntegerType,false);; line 1 pos 7 --- !query 16 +-- !query 21 select from_json() --- !query 16 schema +-- !query 21 schema struct<> --- !query 16 output +-- !query 21 output org.apache.spark.sql.AnalysisException Invalid number of arguments for function from_json; line 1 pos 7 --- !query 17 +-- !query 22 SELECT json_tuple('{"a" : 1, "b" : 2}', CAST(NULL AS STRING), 'b', CAST(NULL AS STRING), 'a') --- !query 17 schema +-- !query 22 schema struct<c0:string,c1:string,c2:string,c3:string> --- !query 17 output +-- !query 22 output NULL 2 NULL 1 --- !query 18 +-- !query 23 CREATE TEMPORARY VIEW jsonTable(jsonField, a) AS SELECT * FROM VALUES ('{"a": 1, "b": 2}', 'a') --- !query 18 schema +-- !query 23 schema struct<> --- !query 18 output +-- !query 23 output --- !query 19 +-- !query 24 SELECT json_tuple(jsonField, 'b', CAST(NULL AS STRING), a) FROM jsonTable --- !query 19 schema +-- !query 24 schema struct<c0:string,c1:string,c2:string> --- !query 19 output +-- !query 24 output 2 NULL 1 --- !query 20 +-- !query 25 DROP VIEW IF EXISTS jsonTable --- !query 20 schema +-- !query 25 schema struct<> --- !query 20 output +-- !query 25 output http://git-wip-us.apache.org/repos/asf/spark/blob/371e4e20/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 119af21..00d2acc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -180,10 +180,26 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { test("to_json - array") { val df = Seq(Tuple1(Tuple1(1) :: Nil)).toDF("a") + val df2 = Seq(Tuple1(Map("a" -> 1) :: Nil)).toDF("a") checkAnswer( df.select(to_json($"a")), Row("""[{"_1":1}]""") :: Nil) + checkAnswer( + df2.select(to_json($"a")), + Row("""[{"a":1}]""") :: Nil) + } + + test("to_json - map") { + val df1 = Seq(Map("a" -> Tuple1(1))).toDF("a") + val df2 = Seq(Map("a" -> 1)).toDF("a") + + checkAnswer( + df1.select(to_json($"a")), + Row("""{"a":{"_1":1}}""") :: Nil) + checkAnswer( + df2.select(to_json($"a")), + Row("""{"a":1}""") :: Nil) } test("to_json with option") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org