Repository: spark Updated Branches: refs/heads/master cfac17ee1 -> 01dd00830
[SPARK-17764][SQL] Add `to_json` supporting to convert nested struct column to JSON string ## What changes were proposed in this pull request? This PR proposes to add `to_json` function in contrast with `from_json` in Scala, Java and Python. It'd be useful if we can convert a same column from/to json. Also, some datasources do not support nested types. If we are forced to save a dataframe into those data sources, we might be able to work around by this function. The usage is as below: ``` scala val df = Seq(Tuple1(Tuple1(1))).toDF("a") df.select(to_json($"a").as("json")).show() ``` ``` bash +--------+ | json| +--------+ |{"_1":1}| +--------+ ``` ## How was this patch tested? Unit tests in `JsonFunctionsSuite` and `JsonExpressionsSuite`. Author: hyukjinkwon <gurwls...@gmail.com> Closes #15354 from HyukjinKwon/SPARK-17764. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/01dd0083 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/01dd0083 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/01dd0083 Branch: refs/heads/master Commit: 01dd0083011741c2bbe5ae1d2a25f2c9a1302b76 Parents: cfac17e Author: hyukjinkwon <gurwls...@gmail.com> Authored: Tue Nov 1 12:46:41 2016 -0700 Committer: Michael Armbrust <mich...@databricks.com> Committed: Tue Nov 1 12:46:41 2016 -0700 ---------------------------------------------------------------------- python/pyspark/sql/functions.py | 23 +++ python/pyspark/sql/readwriter.py | 2 +- python/pyspark/sql/streaming.py | 2 +- .../catalyst/expressions/jsonExpressions.scala | 48 ++++- .../sql/catalyst/json/JacksonGenerator.scala | 197 ++++++++++++++++++ .../spark/sql/catalyst/json/JacksonUtils.scala | 26 +++ .../expressions/JsonExpressionsSuite.scala | 9 + .../scala/org/apache/spark/sql/Dataset.scala | 2 +- .../datasources/json/JacksonGenerator.scala | 198 ------------------- .../datasources/json/JsonFileFormat.scala | 2 +- .../scala/org/apache/spark/sql/functions.scala | 44 ++++- .../apache/spark/sql/JsonFunctionsSuite.scala | 30 ++- 12 files changed, 372 insertions(+), 211 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/01dd0083/python/pyspark/sql/functions.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 7fa3fd2..45e3c22 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1744,6 +1744,29 @@ def from_json(col, schema, options={}): return Column(jc) +@ignore_unicode_prefix +@since(2.1) +def to_json(col, options={}): + """ + Converts a column containing a [[StructType]] into a JSON string. Throws an exception, + in the case of an unsupported type. + + :param col: name of column containing the struct + :param options: options to control converting. accepts the same options as the json datasource + + >>> from pyspark.sql import Row + >>> from pyspark.sql.types import * + >>> data = [(1, Row(name='Alice', age=2))] + >>> df = spark.createDataFrame(data, ("key", "value")) + >>> df.select(to_json(df.value).alias("json")).collect() + [Row(json=u'{"age":2,"name":"Alice"}')] + """ + + sc = SparkContext._active_spark_context + jc = sc._jvm.functions.to_json(_to_java_column(col), options) + return Column(jc) + + @since(1.5) def size(col): """ http://git-wip-us.apache.org/repos/asf/spark/blob/01dd0083/python/pyspark/sql/readwriter.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index bc786ef..b0c51b1 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -161,7 +161,7 @@ class DataFrameReader(OptionUtils): mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None): """ Loads a JSON file (`JSON Lines text format or newline-delimited JSON - <[http://jsonlines.org/>`_) or an RDD of Strings storing JSON objects (one object per + <http://jsonlines.org/>`_) or an RDD of Strings storing JSON objects (one object per record) and returns the result as a :class`DataFrame`. If the ``schema`` parameter is not specified, this function goes http://git-wip-us.apache.org/repos/asf/spark/blob/01dd0083/python/pyspark/sql/streaming.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 559647b..1c94413 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -641,7 +641,7 @@ class DataStreamReader(OptionUtils): timestampFormat=None): """ Loads a JSON file stream (`JSON Lines text format or newline-delimited JSON - <[http://jsonlines.org/>`_) and returns a :class`DataFrame`. + <http://jsonlines.org/>`_) and returns a :class`DataFrame`. If the ``schema`` parameter is not specified, this function goes through the input once to determine the input schema. http://git-wip-us.apache.org/repos/asf/spark/blob/01dd0083/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 65dbd6a..244a5a3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -17,16 +17,17 @@ package org.apache.spark.sql.catalyst.expressions -import java.io.{ByteArrayOutputStream, StringWriter} +import java.io.{ByteArrayOutputStream, CharArrayWriter, StringWriter} import scala.util.parsing.combinator.RegexParsers import com.fasterxml.jackson.core._ +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions, SparkSQLJsonProcessingException} +import org.apache.spark.sql.catalyst.json._ import org.apache.spark.sql.catalyst.util.ParseModes import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -494,3 +495,46 @@ case class JsonToStruct(schema: StructType, options: Map[String, String], child: override def inputTypes: Seq[AbstractDataType] = StringType :: Nil } + +/** + * Converts a [[StructType]] to a json output string. + */ +case class StructToJson(options: Map[String, String], child: Expression) + extends Expression with CodegenFallback with ExpectsInputTypes { + override def nullable: Boolean = true + + @transient + lazy val writer = new CharArrayWriter() + + @transient + lazy val gen = + new JacksonGenerator(child.dataType.asInstanceOf[StructType], writer) + + override def dataType: DataType = StringType + override def children: Seq[Expression] = child :: Nil + + override def checkInputDataTypes(): TypeCheckResult = { + if (StructType.acceptsType(child.dataType)) { + try { + JacksonUtils.verifySchema(child.dataType.asInstanceOf[StructType]) + TypeCheckResult.TypeCheckSuccess + } catch { + case e: UnsupportedOperationException => + TypeCheckResult.TypeCheckFailure(e.getMessage) + } + } else { + TypeCheckResult.TypeCheckFailure( + s"$prettyName requires that the expression is a struct expression.") + } + } + + override def eval(input: InternalRow): Any = { + gen.write(child.eval(input).asInstanceOf[InternalRow]) + gen.flush() + val json = writer.toString + writer.reset() + UTF8String.fromString(json) + } + + override def inputTypes: Seq[AbstractDataType] = StructType :: Nil +} http://git-wip-us.apache.org/repos/asf/spark/blob/01dd0083/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala new file mode 100644 index 0000000..4b548e0 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.json + +import java.io.Writer + +import com.fasterxml.jackson.core._ + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.SpecializedGetters +import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData} +import org.apache.spark.sql.types._ + +private[sql] class JacksonGenerator( + schema: StructType, + writer: Writer, + options: JSONOptions = new JSONOptions(Map.empty[String, String])) { + // A `ValueWriter` is responsible for writing a field of an `InternalRow` to appropriate + // JSON data. Here we are using `SpecializedGetters` rather than `InternalRow` so that + // we can directly access data in `ArrayData` without the help of `SpecificMutableRow`. + private type ValueWriter = (SpecializedGetters, Int) => Unit + + // `ValueWriter`s for all fields of the schema + private val rootFieldWriters: Array[ValueWriter] = schema.map(_.dataType).map(makeWriter).toArray + + private val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null) + + private def makeWriter(dataType: DataType): ValueWriter = dataType match { + case NullType => + (row: SpecializedGetters, ordinal: Int) => + gen.writeNull() + + case BooleanType => + (row: SpecializedGetters, ordinal: Int) => + gen.writeBoolean(row.getBoolean(ordinal)) + + case ByteType => + (row: SpecializedGetters, ordinal: Int) => + gen.writeNumber(row.getByte(ordinal)) + + case ShortType => + (row: SpecializedGetters, ordinal: Int) => + gen.writeNumber(row.getShort(ordinal)) + + case IntegerType => + (row: SpecializedGetters, ordinal: Int) => + gen.writeNumber(row.getInt(ordinal)) + + case LongType => + (row: SpecializedGetters, ordinal: Int) => + gen.writeNumber(row.getLong(ordinal)) + + case FloatType => + (row: SpecializedGetters, ordinal: Int) => + gen.writeNumber(row.getFloat(ordinal)) + + case DoubleType => + (row: SpecializedGetters, ordinal: Int) => + gen.writeNumber(row.getDouble(ordinal)) + + case StringType => + (row: SpecializedGetters, ordinal: Int) => + gen.writeString(row.getUTF8String(ordinal).toString) + + case TimestampType => + (row: SpecializedGetters, ordinal: Int) => + val timestampString = + options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal))) + gen.writeString(timestampString) + + case DateType => + (row: SpecializedGetters, ordinal: Int) => + val dateString = + options.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal))) + gen.writeString(dateString) + + case BinaryType => + (row: SpecializedGetters, ordinal: Int) => + gen.writeBinary(row.getBinary(ordinal)) + + case dt: DecimalType => + (row: SpecializedGetters, ordinal: Int) => + gen.writeNumber(row.getDecimal(ordinal, dt.precision, dt.scale).toJavaBigDecimal) + + case st: StructType => + val fieldWriters = st.map(_.dataType).map(makeWriter) + (row: SpecializedGetters, ordinal: Int) => + writeObject(writeFields(row.getStruct(ordinal, st.length), st, fieldWriters)) + + case at: ArrayType => + val elementWriter = makeWriter(at.elementType) + (row: SpecializedGetters, ordinal: Int) => + writeArray(writeArrayData(row.getArray(ordinal), elementWriter)) + + case mt: MapType => + val valueWriter = makeWriter(mt.valueType) + (row: SpecializedGetters, ordinal: Int) => + writeObject(writeMapData(row.getMap(ordinal), mt, valueWriter)) + + // For UDT values, they should be in the SQL type's corresponding value type. + // We should not see values in the user-defined class at here. + // For example, VectorUDT's SQL type is an array of double. So, we should expect that v is + // an ArrayData at here, instead of a Vector. + case t: UserDefinedType[_] => + makeWriter(t.sqlType) + + case _ => + (row: SpecializedGetters, ordinal: Int) => + val v = row.get(ordinal, dataType) + sys.error(s"Failed to convert value $v (class of ${v.getClass}}) " + + s"with the type of $dataType to JSON.") + } + + private def writeObject(f: => Unit): Unit = { + gen.writeStartObject() + f + gen.writeEndObject() + } + + private def writeFields( + row: InternalRow, schema: StructType, fieldWriters: Seq[ValueWriter]): Unit = { + var i = 0 + while (i < row.numFields) { + val field = schema(i) + if (!row.isNullAt(i)) { + gen.writeFieldName(field.name) + fieldWriters(i).apply(row, i) + } + i += 1 + } + } + + private def writeArray(f: => Unit): Unit = { + gen.writeStartArray() + f + gen.writeEndArray() + } + + private def writeArrayData( + array: ArrayData, fieldWriter: ValueWriter): Unit = { + var i = 0 + while (i < array.numElements()) { + if (!array.isNullAt(i)) { + fieldWriter.apply(array, i) + } else { + gen.writeNull() + } + i += 1 + } + } + + private def writeMapData( + map: MapData, mapType: MapType, fieldWriter: ValueWriter): Unit = { + val keyArray = map.keyArray() + val valueArray = map.valueArray() + var i = 0 + while (i < map.numElements()) { + gen.writeFieldName(keyArray.get(i, mapType.keyType).toString) + if (!valueArray.isNullAt(i)) { + fieldWriter.apply(valueArray, i) + } else { + gen.writeNull() + } + i += 1 + } + } + + def close(): Unit = gen.close() + + def flush(): Unit = gen.flush() + + /** + * Transforms a single InternalRow to JSON using Jackson + * + * @param row The row to convert + */ + def write(row: InternalRow): Unit = { + writeObject { + writeFields(row, schema, rootFieldWriters) + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/01dd0083/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala index c4d9abb..3b23c6c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.json import com.fasterxml.jackson.core.{JsonParser, JsonToken} +import org.apache.spark.sql.types._ + object JacksonUtils { /** * Advance the parser until a null or a specific token is found @@ -29,4 +31,28 @@ object JacksonUtils { case x => x != stopOn } } + + /** + * Verify if the schema is supported in JSON parsing. + */ + def verifySchema(schema: StructType): Unit = { + def verifyType(name: String, dataType: DataType): Unit = dataType match { + case NullType | BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | + DoubleType | StringType | TimestampType | DateType | BinaryType | _: DecimalType => + + case st: StructType => st.foreach(field => verifyType(field.name, field.dataType)) + + case at: ArrayType => verifyType(name, at.elementType) + + case mt: MapType => verifyType(name, mt.keyType) + + case udt: UserDefinedType[_] => verifyType(name, udt.sqlType) + + case _ => + throw new UnsupportedOperationException( + s"Unable to convert column $name of type ${dataType.simpleString} to JSON.") + } + + schema.foreach(field => verifyType(field.name, field.dataType)) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/01dd0083/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index 8462393..f9db649 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -343,4 +343,13 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { null ) } + + test("to_json") { + val schema = StructType(StructField("a", IntegerType) :: Nil) + val struct = Literal.create(create_row(1), schema) + checkEvaluation( + StructToJson(Map.empty, struct), + """{"a":1}""" + ) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/01dd0083/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 6e0a247..eb2b20a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.json.JacksonGenerator import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -45,7 +46,6 @@ import org.apache.spark.sql.catalyst.util.usePrettyExpression import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, QueryExecution, SQLExecution} import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand, GlobalTempView, LocalTempView} import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.execution.datasources.json.JacksonGenerator import org.apache.spark.sql.execution.python.EvaluatePython import org.apache.spark.sql.streaming.DataStreamWriter import org.apache.spark.sql.types._ http://git-wip-us.apache.org/repos/asf/spark/blob/01dd0083/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala deleted file mode 100644 index 5b55b70..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources.json - -import java.io.Writer - -import com.fasterxml.jackson.core._ - -import org.apache.spark.sql.catalyst.expressions.SpecializedGetters -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.json.JSONOptions -import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData} -import org.apache.spark.sql.types._ - -private[sql] class JacksonGenerator( - schema: StructType, - writer: Writer, - options: JSONOptions = new JSONOptions(Map.empty[String, String])) { - // A `ValueWriter` is responsible for writing a field of an `InternalRow` to appropriate - // JSON data. Here we are using `SpecializedGetters` rather than `InternalRow` so that - // we can directly access data in `ArrayData` without the help of `SpecificMutableRow`. - private type ValueWriter = (SpecializedGetters, Int) => Unit - - // `ValueWriter`s for all fields of the schema - private val rootFieldWriters: Array[ValueWriter] = schema.map(_.dataType).map(makeWriter).toArray - - private val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null) - - private def makeWriter(dataType: DataType): ValueWriter = dataType match { - case NullType => - (row: SpecializedGetters, ordinal: Int) => - gen.writeNull() - - case BooleanType => - (row: SpecializedGetters, ordinal: Int) => - gen.writeBoolean(row.getBoolean(ordinal)) - - case ByteType => - (row: SpecializedGetters, ordinal: Int) => - gen.writeNumber(row.getByte(ordinal)) - - case ShortType => - (row: SpecializedGetters, ordinal: Int) => - gen.writeNumber(row.getShort(ordinal)) - - case IntegerType => - (row: SpecializedGetters, ordinal: Int) => - gen.writeNumber(row.getInt(ordinal)) - - case LongType => - (row: SpecializedGetters, ordinal: Int) => - gen.writeNumber(row.getLong(ordinal)) - - case FloatType => - (row: SpecializedGetters, ordinal: Int) => - gen.writeNumber(row.getFloat(ordinal)) - - case DoubleType => - (row: SpecializedGetters, ordinal: Int) => - gen.writeNumber(row.getDouble(ordinal)) - - case StringType => - (row: SpecializedGetters, ordinal: Int) => - gen.writeString(row.getUTF8String(ordinal).toString) - - case TimestampType => - (row: SpecializedGetters, ordinal: Int) => - val timestampString = - options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal))) - gen.writeString(timestampString) - - case DateType => - (row: SpecializedGetters, ordinal: Int) => - val dateString = - options.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal))) - gen.writeString(dateString) - - case BinaryType => - (row: SpecializedGetters, ordinal: Int) => - gen.writeBinary(row.getBinary(ordinal)) - - case dt: DecimalType => - (row: SpecializedGetters, ordinal: Int) => - gen.writeNumber(row.getDecimal(ordinal, dt.precision, dt.scale).toJavaBigDecimal) - - case st: StructType => - val fieldWriters = st.map(_.dataType).map(makeWriter) - (row: SpecializedGetters, ordinal: Int) => - writeObject(writeFields(row.getStruct(ordinal, st.length), st, fieldWriters)) - - case at: ArrayType => - val elementWriter = makeWriter(at.elementType) - (row: SpecializedGetters, ordinal: Int) => - writeArray(writeArrayData(row.getArray(ordinal), elementWriter)) - - case mt: MapType => - val valueWriter = makeWriter(mt.valueType) - (row: SpecializedGetters, ordinal: Int) => - writeObject(writeMapData(row.getMap(ordinal), mt, valueWriter)) - - // For UDT values, they should be in the SQL type's corresponding value type. - // We should not see values in the user-defined class at here. - // For example, VectorUDT's SQL type is an array of double. So, we should expect that v is - // an ArrayData at here, instead of a Vector. - case t: UserDefinedType[_] => - makeWriter(t.sqlType) - - case _ => - (row: SpecializedGetters, ordinal: Int) => - val v = row.get(ordinal, dataType) - sys.error(s"Failed to convert value $v (class of ${v.getClass}}) " + - s"with the type of $dataType to JSON.") - } - - private def writeObject(f: => Unit): Unit = { - gen.writeStartObject() - f - gen.writeEndObject() - } - - private def writeFields( - row: InternalRow, schema: StructType, fieldWriters: Seq[ValueWriter]): Unit = { - var i = 0 - while (i < row.numFields) { - val field = schema(i) - if (!row.isNullAt(i)) { - gen.writeFieldName(field.name) - fieldWriters(i).apply(row, i) - } - i += 1 - } - } - - private def writeArray(f: => Unit): Unit = { - gen.writeStartArray() - f - gen.writeEndArray() - } - - private def writeArrayData( - array: ArrayData, fieldWriter: ValueWriter): Unit = { - var i = 0 - while (i < array.numElements()) { - if (!array.isNullAt(i)) { - fieldWriter.apply(array, i) - } else { - gen.writeNull() - } - i += 1 - } - } - - private def writeMapData( - map: MapData, mapType: MapType, fieldWriter: ValueWriter): Unit = { - val keyArray = map.keyArray() - val valueArray = map.valueArray() - var i = 0 - while (i < map.numElements()) { - gen.writeFieldName(keyArray.get(i, mapType.keyType).toString) - if (!valueArray.isNullAt(i)) { - fieldWriter.apply(valueArray, i) - } else { - gen.writeNull() - } - i += 1 - } - } - - def close(): Unit = gen.close() - - def flush(): Unit = gen.flush() - - /** - * Transforms a single InternalRow to JSON using Jackson - * - * @param row The row to convert - */ - def write(row: InternalRow): Unit = { - writeObject { - writeFields(row, schema, rootFieldWriters) - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/01dd0083/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 5a409c0..0e38aef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -32,7 +32,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions} +import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JacksonParser, JSONOptions} import org.apache.spark.sql.catalyst.util.CompressionCodecs import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.text.TextOutputWriter http://git-wip-us.apache.org/repos/asf/spark/blob/01dd0083/sql/core/src/main/scala/org/apache/spark/sql/functions.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 5f1efd2..944a476 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -2883,10 +2883,10 @@ object functions { * (Scala-specific) Parses a column containing a JSON string into a [[StructType]] with the * specified schema. Returns `null`, in the case of an unparseable string. * + * @param e a string column containing JSON data. * @param schema the schema to use when parsing the json string * @param options options to control how the json is parsed. accepts the same options and the * json data source. - * @param e a string column containing JSON data. * * @group collection_funcs * @since 2.1.0 @@ -2936,6 +2936,48 @@ object functions { def from_json(e: Column, schema: String, options: java.util.Map[String, String]): Column = from_json(e, DataType.fromJson(schema).asInstanceOf[StructType], options) + + /** + * (Scala-specific) Converts a column containing a [[StructType]] into a JSON string with the + * specified schema. Throws an exception, in the case of an unsupported type. + * + * @param e a struct column. + * @param options options to control how the struct column is converted into a json string. + * accepts the same options and the json data source. + * + * @group collection_funcs + * @since 2.1.0 + */ + def to_json(e: Column, options: Map[String, String]): Column = withExpr { + StructToJson(options, e.expr) + } + + /** + * (Java-specific) Converts a column containing a [[StructType]] into a JSON string with the + * specified schema. Throws an exception, in the case of an unsupported type. + * + * @param e a struct column. + * @param options options to control how the struct column is converted into a json string. + * accepts the same options and the json data source. + * + * @group collection_funcs + * @since 2.1.0 + */ + def to_json(e: Column, options: java.util.Map[String, String]): Column = + to_json(e, options.asScala.toMap) + + /** + * Converts a column containing a [[StructType]] into a JSON string with the + * specified schema. Throws an exception, in the case of an unsupported type. + * + * @param e a struct column. + * + * @group collection_funcs + * @since 2.1.0 + */ + def to_json(e: Column): Column = + to_json(e, Map.empty[String, String]) + /** * Returns length of array or map. * http://git-wip-us.apache.org/repos/asf/spark/blob/01dd0083/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 518d6e9..59ae889 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -17,9 +17,9 @@ package org.apache.spark.sql -import org.apache.spark.sql.functions.from_json +import org.apache.spark.sql.functions.{from_json, struct, to_json} import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.types.{CalendarIntervalType, IntegerType, StructType} class JsonFunctionsSuite extends QueryTest with SharedSQLContext { import testImplicits._ @@ -31,7 +31,6 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { Row("alice", "5")) } - val tuples: Seq[(String, String)] = ("1", """{"f1": "value1", "f2": "value2", "f3": 3, "f5": 5.23}""") :: ("2", """{"f1": "value12", "f3": "value3", "f2": 2, "f4": 4.01}""") :: @@ -97,7 +96,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { checkAnswer(expr, expected) } - test("json_parser") { + test("from_json") { val df = Seq("""{"a": 1}""").toDS() val schema = new StructType().add("a", IntegerType) @@ -106,7 +105,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { Row(Row(1)) :: Nil) } - test("json_parser missing columns") { + test("from_json missing columns") { val df = Seq("""{"a": 1}""").toDS() val schema = new StructType().add("b", IntegerType) @@ -115,7 +114,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { Row(Row(null)) :: Nil) } - test("json_parser invalid json") { + test("from_json invalid json") { val df = Seq("""{"a" 1}""").toDS() val schema = new StructType().add("a", IntegerType) @@ -123,4 +122,23 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { df.select(from_json($"value", schema)), Row(null) :: Nil) } + + test("to_json") { + val df = Seq(Tuple1(Tuple1(1))).toDF("a") + + checkAnswer( + df.select(to_json($"a")), + Row("""{"_1":1}""") :: Nil) + } + + test("to_json unsupported type") { + val df = Seq(Tuple1(Tuple1("interval -3 month 7 hours"))).toDF("a") + .select(struct($"a._1".cast(CalendarIntervalType).as("a")).as("c")) + val e = intercept[AnalysisException]{ + // Unsupported type throws an exception + df.select(to_json($"c")).collect() + } + assert(e.getMessage.contains( + "Unable to convert column a of type calendarinterval to JSON.")) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org