Repository: spark Updated Branches: refs/heads/master 9914b1b2c -> 45d798c32
[SPARK-8278] Remove non-streaming JSON reader. Author: Reynold Xin <r...@databricks.com> Closes #7501 from rxin/jsonrdd and squashes the following commits: 767ec55 [Reynold Xin] More Mima 51f456e [Reynold Xin] Mima exclude. 789cb80 [Reynold Xin] Fixed compilation error. b4cf50d [Reynold Xin] [SPARK-8278] Remove non-streaming JSON reader. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/45d798c3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/45d798c3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/45d798c3 Branch: refs/heads/master Commit: 45d798c323ffe32bc2eba4dbd271c4572f5a30cf Parents: 9914b1b Author: Reynold Xin <r...@databricks.com> Authored: Sat Jul 18 20:27:55 2015 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Sat Jul 18 20:27:55 2015 -0700 ---------------------------------------------------------------------- project/MimaExcludes.scala | 3 + .../org/apache/spark/sql/DataFrameReader.scala | 15 +- .../scala/org/apache/spark/sql/SQLConf.scala | 5 - .../apache/spark/sql/json/JSONRelation.scala | 48 +- .../org/apache/spark/sql/json/JsonRDD.scala | 449 ------------------- .../org/apache/spark/sql/json/JsonSuite.scala | 27 +- 6 files changed, 29 insertions(+), 518 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/45d798c3/project/MimaExcludes.scala ---------------------------------------------------------------------- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 4e4e810..36417f5 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -64,6 +64,9 @@ object MimaExcludes { excludePackage("org.apache.spark.sql.execution"), // Parquet support is considered private. excludePackage("org.apache.spark.sql.parquet"), + // The old JSON RDD is removed in favor of streaming Jackson + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.json.JsonRDD$"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.json.JsonRDD"), // local function inside a method ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.sql.SQLContext.org$apache$spark$sql$SQLContext$$needsConversion$1") http://git-wip-us.apache.org/repos/asf/spark/blob/45d798c3/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 9ad6e21..9b23df4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -27,7 +27,7 @@ import org.apache.spark.api.java.JavaRDD import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.RDD import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation} -import org.apache.spark.sql.json.{JsonRDD, JSONRelation} +import org.apache.spark.sql.json.JSONRelation import org.apache.spark.sql.parquet.ParquetRelation2 import org.apache.spark.sql.sources.{LogicalRelation, ResolvedDataSource} import org.apache.spark.sql.types.StructType @@ -236,17 +236,8 @@ class DataFrameReader private[sql](sqlContext: SQLContext) { */ def json(jsonRDD: RDD[String]): DataFrame = { val samplingRatio = extraOptions.getOrElse("samplingRatio", "1.0").toDouble - if (sqlContext.conf.useJacksonStreamingAPI) { - sqlContext.baseRelationToDataFrame( - new JSONRelation(() => jsonRDD, None, samplingRatio, userSpecifiedSchema)(sqlContext)) - } else { - val columnNameOfCorruptJsonRecord = sqlContext.conf.columnNameOfCorruptRecord - val appliedSchema = userSpecifiedSchema.getOrElse( - JsonRDD.nullTypeToStringType( - JsonRDD.inferSchema(jsonRDD, 1.0, columnNameOfCorruptJsonRecord))) - val rowRDD = JsonRDD.jsonStringToRow(jsonRDD, appliedSchema, columnNameOfCorruptJsonRecord) - sqlContext.internalCreateDataFrame(rowRDD, appliedSchema) - } + sqlContext.baseRelationToDataFrame( + new JSONRelation(() => jsonRDD, None, samplingRatio, userSpecifiedSchema)(sqlContext)) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/45d798c3/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 2c2f7c3..84d3271 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -401,9 +401,6 @@ private[spark] object SQLConf { "spark.sql.useSerializer2", defaultValue = Some(true), isPublic = false) - val USE_JACKSON_STREAMING_API = booleanConf("spark.sql.json.useJacksonStreamingAPI", - defaultValue = Some(true), doc = "<TODO>") - object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" } @@ -473,8 +470,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf { private[spark] def useSqlSerializer2: Boolean = getConf(USE_SQL_SERIALIZER2) - private[spark] def useJacksonStreamingAPI: Boolean = getConf(USE_JACKSON_STREAMING_API) - private[spark] def autoBroadcastJoinThreshold: Int = getConf(AUTO_BROADCASTJOIN_THRESHOLD) private[spark] def defaultSizeInBytes: Long = http://git-wip-us.apache.org/repos/asf/spark/blob/45d798c3/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala index 2361d3b..25802d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala @@ -157,51 +157,27 @@ private[sql] class JSONRelation( } } - private val useJacksonStreamingAPI: Boolean = sqlContext.conf.useJacksonStreamingAPI - override val needConversion: Boolean = false override lazy val schema = userSpecifiedSchema.getOrElse { - if (useJacksonStreamingAPI) { - InferSchema( - baseRDD(), - samplingRatio, - sqlContext.conf.columnNameOfCorruptRecord) - } else { - JsonRDD.nullTypeToStringType( - JsonRDD.inferSchema( - baseRDD(), - samplingRatio, - sqlContext.conf.columnNameOfCorruptRecord)) - } + InferSchema( + baseRDD(), + samplingRatio, + sqlContext.conf.columnNameOfCorruptRecord) } override def buildScan(): RDD[Row] = { - if (useJacksonStreamingAPI) { - JacksonParser( - baseRDD(), - schema, - sqlContext.conf.columnNameOfCorruptRecord).map(_.asInstanceOf[Row]) - } else { - JsonRDD.jsonStringToRow( - baseRDD(), - schema, - sqlContext.conf.columnNameOfCorruptRecord).map(_.asInstanceOf[Row]) - } + JacksonParser( + baseRDD(), + schema, + sqlContext.conf.columnNameOfCorruptRecord).map(_.asInstanceOf[Row]) } override def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row] = { - if (useJacksonStreamingAPI) { - JacksonParser( - baseRDD(), - StructType.fromAttributes(requiredColumns), - sqlContext.conf.columnNameOfCorruptRecord).map(_.asInstanceOf[Row]) - } else { - JsonRDD.jsonStringToRow( - baseRDD(), - StructType.fromAttributes(requiredColumns), - sqlContext.conf.columnNameOfCorruptRecord).map(_.asInstanceOf[Row]) - } + JacksonParser( + baseRDD(), + StructType.fromAttributes(requiredColumns), + sqlContext.conf.columnNameOfCorruptRecord).map(_.asInstanceOf[Row]) } override def insert(data: DataFrame, overwrite: Boolean): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/45d798c3/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala deleted file mode 100644 index b392a51..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala +++ /dev/null @@ -1,449 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.json - -import scala.collection.Map -import scala.collection.convert.Wrappers.{JListWrapper, JMapWrapper} - -import com.fasterxml.jackson.core.JsonProcessingException -import com.fasterxml.jackson.databind.ObjectMapper - -import org.apache.spark.Logging -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.ScalaReflection -import org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String - - -private[sql] object JsonRDD extends Logging { - - private[sql] def jsonStringToRow( - json: RDD[String], - schema: StructType, - columnNameOfCorruptRecords: String): RDD[InternalRow] = { - parseJson(json, columnNameOfCorruptRecords).map(parsed => asRow(parsed, schema)) - } - - private[sql] def inferSchema( - json: RDD[String], - samplingRatio: Double = 1.0, - columnNameOfCorruptRecords: String): StructType = { - require(samplingRatio > 0, s"samplingRatio ($samplingRatio) should be greater than 0") - val schemaData = if (samplingRatio > 0.99) json else json.sample(false, samplingRatio, 1) - val allKeys = - if (schemaData.isEmpty()) { - Set.empty[(String, DataType)] - } else { - parseJson(schemaData, columnNameOfCorruptRecords).map(allKeysWithValueTypes).reduce(_ ++ _) - } - createSchema(allKeys) - } - - private def createSchema(allKeys: Set[(String, DataType)]): StructType = { - // Resolve type conflicts - val resolved = allKeys.groupBy { - case (key, dataType) => key - }.map { - // Now, keys and types are organized in the format of - // key -> Set(type1, type2, ...). - case (key, typeSet) => { - val fieldName = key.substring(1, key.length - 1).split("`.`").toSeq - val dataType = typeSet.map { - case (_, dataType) => dataType - }.reduce((type1: DataType, type2: DataType) => compatibleType(type1, type2)) - - (fieldName, dataType) - } - } - - def makeStruct(values: Seq[Seq[String]], prefix: Seq[String]): StructType = { - val (topLevel, structLike) = values.partition(_.size == 1) - - val topLevelFields = topLevel.filter { - name => resolved.get(prefix ++ name).get match { - case ArrayType(elementType, _) => { - def hasInnerStruct(t: DataType): Boolean = t match { - case s: StructType => true - case ArrayType(t1, _) => hasInnerStruct(t1) - case o => false - } - - // Check if this array has inner struct. - !hasInnerStruct(elementType) - } - case struct: StructType => false - case _ => true - } - }.map { - a => StructField(a.head, resolved.get(prefix ++ a).get, nullable = true) - } - val topLevelFieldNameSet = topLevelFields.map(_.name) - - val structFields: Seq[StructField] = structLike.groupBy(_(0)).filter { - case (name, _) => !topLevelFieldNameSet.contains(name) - }.map { - case (name, fields) => { - val nestedFields = fields.map(_.tail) - val structType = makeStruct(nestedFields, prefix :+ name) - val dataType = resolved.get(prefix :+ name).get - dataType match { - case array: ArrayType => - // The pattern of this array is ArrayType(...(ArrayType(StructType))). - // Since the inner struct of array is a placeholder (StructType(Nil)), - // we need to replace this placeholder with the actual StructType (structType). - def getActualArrayType( - innerStruct: StructType, - currentArray: ArrayType): ArrayType = currentArray match { - case ArrayType(s: StructType, containsNull) => - ArrayType(innerStruct, containsNull) - case ArrayType(a: ArrayType, containsNull) => - ArrayType(getActualArrayType(innerStruct, a), containsNull) - } - Some(StructField(name, getActualArrayType(structType, array), nullable = true)) - case struct: StructType => Some(StructField(name, structType, nullable = true)) - // dataType is StringType means that we have resolved type conflicts involving - // primitive types and complex types. So, the type of name has been relaxed to - // StringType. Also, this field should have already been put in topLevelFields. - case StringType => None - } - } - }.flatMap(field => field).toSeq - - StructType((topLevelFields ++ structFields).sortBy(_.name)) - } - - makeStruct(resolved.keySet.toSeq, Nil) - } - - private[sql] def nullTypeToStringType(struct: StructType): StructType = { - val fields = struct.fields.map { - case StructField(fieldName, dataType, nullable, _) => { - val newType = dataType match { - case NullType => StringType - case ArrayType(NullType, containsNull) => ArrayType(StringType, containsNull) - case ArrayType(struct: StructType, containsNull) => - ArrayType(nullTypeToStringType(struct), containsNull) - case struct: StructType => nullTypeToStringType(struct) - case other: DataType => other - } - StructField(fieldName, newType, nullable) - } - } - - StructType(fields) - } - - /** - * Returns the most general data type for two given data types. - */ - private[json] def compatibleType(t1: DataType, t2: DataType): DataType = { - HiveTypeCoercion.findTightestCommonTypeOfTwo(t1, t2) match { - case Some(commonType) => commonType - case None => - // t1 or t2 is a StructType, ArrayType, or an unexpected type. - (t1, t2) match { - case (other: DataType, NullType) => other - case (NullType, other: DataType) => other - case (StructType(fields1), StructType(fields2)) => { - val newFields = (fields1 ++ fields2).groupBy(field => field.name).map { - case (name, fieldTypes) => { - val dataType = fieldTypes.map(field => field.dataType).reduce( - (type1: DataType, type2: DataType) => compatibleType(type1, type2)) - StructField(name, dataType, true) - } - } - StructType(newFields.toSeq.sortBy(_.name)) - } - case (ArrayType(elementType1, containsNull1), ArrayType(elementType2, containsNull2)) => - ArrayType(compatibleType(elementType1, elementType2), containsNull1 || containsNull2) - // TODO: We should use JsonObjectStringType to mark that values of field will be - // strings and every string is a Json object. - case (_, _) => StringType - } - } - } - - private def typeOfPrimitiveValue: PartialFunction[Any, DataType] = { - // For Integer values, use LongType by default. - val useLongType: PartialFunction[Any, DataType] = { - case value: IntegerType.InternalType => LongType - } - - useLongType orElse ScalaReflection.typeOfObject orElse { - // Since we do not have a data type backed by BigInteger, - // when we see a Java BigInteger, we use DecimalType. - case value: java.math.BigInteger => DecimalType.Unlimited - // DecimalType's JVMType is scala BigDecimal. - case value: java.math.BigDecimal => DecimalType.Unlimited - // Unexpected data type. - case _ => StringType - } - } - - /** - * Returns the element type of an JSON array. We go through all elements of this array - * to detect any possible type conflict. We use [[compatibleType]] to resolve - * type conflicts. - */ - private def typeOfArray(l: Seq[Any]): ArrayType = { - val elements = l.flatMap(v => Option(v)) - if (elements.isEmpty) { - // If this JSON array is empty, we use NullType as a placeholder. - // If this array is not empty in other JSON objects, we can resolve - // the type after we have passed through all JSON objects. - ArrayType(NullType, containsNull = true) - } else { - val elementType = elements.map { - e => e match { - case map: Map[_, _] => StructType(Nil) - // We have an array of arrays. If those element arrays do not have the same - // element types, we will return ArrayType[StringType]. - case seq: Seq[_] => typeOfArray(seq) - case value => typeOfPrimitiveValue(value) - } - }.reduce((type1: DataType, type2: DataType) => compatibleType(type1, type2)) - - ArrayType(elementType, containsNull = true) - } - } - - /** - * Figures out all key names and data types of values from a parsed JSON object - * (in the format of Map[Stirng, Any]). When the value of a key is an JSON object, we - * only use a placeholder (StructType(Nil)) to mark that it should be a struct - * instead of getting all fields of this struct because a field does not appear - * in this JSON object can appear in other JSON objects. - */ - private def allKeysWithValueTypes(m: Map[String, Any]): Set[(String, DataType)] = { - val keyValuePairs = m.map { - // Quote the key with backticks to handle cases which have dots - // in the field name. - case (key, value) => (s"`$key`", value) - }.toSet - keyValuePairs.flatMap { - case (key: String, struct: Map[_, _]) => { - // The value associated with the key is an JSON object. - allKeysWithValueTypes(struct.asInstanceOf[Map[String, Any]]).map { - case (k, dataType) => (s"$key.$k", dataType) - } ++ Set((key, StructType(Nil))) - } - case (key: String, array: Seq[_]) => { - // The value associated with the key is an array. - // Handle inner structs of an array. - def buildKeyPathForInnerStructs(v: Any, t: DataType): Seq[(String, DataType)] = t match { - case ArrayType(e: StructType, _) => { - // The elements of this arrays are structs. - v.asInstanceOf[Seq[Map[String, Any]]].flatMap(Option(_)).flatMap { - element => allKeysWithValueTypes(element) - }.map { - case (k, t) => (s"$key.$k", t) - } - } - case ArrayType(t1, _) => - v.asInstanceOf[Seq[Any]].flatMap(Option(_)).flatMap { - element => buildKeyPathForInnerStructs(element, t1) - } - case other => Nil - } - val elementType = typeOfArray(array) - buildKeyPathForInnerStructs(array, elementType) :+ (key, elementType) - } - // we couldn't tell what the type is if the value is null or empty string - case (key: String, value) if value == "" || value == null => (key, NullType) :: Nil - case (key: String, value) => (key, typeOfPrimitiveValue(value)) :: Nil - } - } - - /** - * Converts a Java Map/List to a Scala Map/Seq. - * We do not use Jackson's scala module at here because - * DefaultScalaModule in jackson-module-scala will make - * the parsing very slow. - */ - private def scalafy(obj: Any): Any = obj match { - case map: java.util.Map[_, _] => - // .map(identity) is used as a workaround of non-serializable Map - // generated by .mapValues. - // This issue is documented at https://issues.scala-lang.org/browse/SI-7005 - JMapWrapper(map).mapValues(scalafy).map(identity) - case list: java.util.List[_] => - JListWrapper(list).map(scalafy) - case atom => atom - } - - private def parseJson( - json: RDD[String], - columnNameOfCorruptRecords: String): RDD[Map[String, Any]] = { - // According to [Jackson-72: https://jira.codehaus.org/browse/JACKSON-72], - // ObjectMapper will not return BigDecimal when - // "DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS" is disabled - // (see NumberDeserializer.deserialize for the logic). - // But, we do not want to enable this feature because it will use BigDecimal - // for every float number, which will be slow. - // So, right now, we will have Infinity for those BigDecimal number. - // TODO: Support BigDecimal. - json.mapPartitions(iter => { - // When there is a key appearing multiple times (a duplicate key), - // the ObjectMapper will take the last value associated with this duplicate key. - // For example: for {"key": 1, "key":2}, we will get "key"->2. - val mapper = new ObjectMapper() - iter.flatMap { record => - try { - val parsed = mapper.readValue(record, classOf[Object]) match { - case map: java.util.Map[_, _] => scalafy(map).asInstanceOf[Map[String, Any]] :: Nil - case list: java.util.List[_] => scalafy(list).asInstanceOf[Seq[Map[String, Any]]] - case _ => - sys.error( - s"Failed to parse record $record. Please make sure that each line of the file " + - "(or each string in the RDD) is a valid JSON object or an array of JSON objects.") - } - - parsed - } catch { - case e: JsonProcessingException => - Map(columnNameOfCorruptRecords -> UTF8String.fromString(record)) :: Nil - } - } - }) - } - - private def toLong(value: Any): Long = { - value match { - case value: java.lang.Integer => value.asInstanceOf[Int].toLong - case value: java.lang.Long => value.asInstanceOf[Long] - } - } - - private def toDouble(value: Any): Double = { - value match { - case value: java.lang.Integer => value.asInstanceOf[Int].toDouble - case value: java.lang.Long => value.asInstanceOf[Long].toDouble - case value: java.lang.Double => value.asInstanceOf[Double] - } - } - - private def toDecimal(value: Any): Decimal = { - value match { - case value: java.lang.Integer => Decimal(value) - case value: java.lang.Long => Decimal(value) - case value: java.math.BigInteger => Decimal(new java.math.BigDecimal(value)) - case value: java.lang.Double => Decimal(value) - case value: java.math.BigDecimal => Decimal(value) - } - } - - private def toJsonArrayString(seq: Seq[Any]): String = { - val builder = new StringBuilder - builder.append("[") - var count = 0 - seq.foreach { - element => - if (count > 0) builder.append(",") - count += 1 - builder.append(toString(element)) - } - builder.append("]") - - builder.toString() - } - - private def toJsonObjectString(map: Map[String, Any]): String = { - val builder = new StringBuilder - builder.append("{") - var count = 0 - map.foreach { - case (key, value) => - if (count > 0) builder.append(",") - count += 1 - val stringValue = if (value.isInstanceOf[String]) s"""\"$value\"""" else toString(value) - builder.append(s"""\"${key}\":${stringValue}""") - } - builder.append("}") - - builder.toString() - } - - private def toString(value: Any): String = { - value match { - case value: Map[_, _] => toJsonObjectString(value.asInstanceOf[Map[String, Any]]) - case value: Seq[_] => toJsonArrayString(value) - case value => Option(value).map(_.toString).orNull - } - } - - private def toDate(value: Any): Int = { - value match { - // only support string as date - case value: java.lang.String => - DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(value).getTime) - case value: java.sql.Date => DateTimeUtils.fromJavaDate(value) - } - } - - private def toTimestamp(value: Any): Long = { - value match { - case value: java.lang.Integer => value.asInstanceOf[Int].toLong * 1000L - case value: java.lang.Long => value * 1000L - case value: java.lang.String => DateTimeUtils.stringToTime(value).getTime * 1000L - } - } - - private[json] def enforceCorrectType(value: Any, desiredType: DataType): Any = { - if (value == null) { - null - } else { - desiredType match { - case StringType => UTF8String.fromString(toString(value)) - case _ if value == null || value == "" => null // guard the non string type - case IntegerType => value.asInstanceOf[IntegerType.InternalType] - case LongType => toLong(value) - case DoubleType => toDouble(value) - case DecimalType() => toDecimal(value) - case BooleanType => value.asInstanceOf[BooleanType.InternalType] - case NullType => null - case ArrayType(elementType, _) => - value.asInstanceOf[Seq[Any]].map(enforceCorrectType(_, elementType)) - case MapType(StringType, valueType, _) => - val map = value.asInstanceOf[Map[String, Any]] - map.map { - case (k, v) => - (UTF8String.fromString(k), enforceCorrectType(v, valueType)) - }.map(identity) - case struct: StructType => asRow(value.asInstanceOf[Map[String, Any]], struct) - case DateType => toDate(value) - case TimestampType => toTimestamp(value) - } - } - } - - private def asRow(json: Map[String, Any], schema: StructType): InternalRow = { - // TODO: Reuse the row instead of creating a new one for every record. - val row = new GenericMutableRow(schema.fields.length) - schema.fields.zipWithIndex.foreach { - case (StructField(name, dataType, _, _), i) => - row.update(i, json.get(name).flatMap(v => Option(v)).map( - enforceCorrectType(_, dataType)).orNull) - } - - row - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/45d798c3/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala index 8204a58..3475f9d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala @@ -1079,28 +1079,23 @@ class JsonSuite extends QueryTest with TestJsonData { } test("SPARK-7565 MapType in JsonRDD") { - val useStreaming = ctx.conf.useJacksonStreamingAPI val oldColumnNameOfCorruptRecord = ctx.conf.columnNameOfCorruptRecord ctx.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, "_unparsed") val schemaWithSimpleMap = StructType( StructField("map", MapType(StringType, IntegerType, true), false) :: Nil) - try{ - for (useStreaming <- List(true, false)) { - ctx.setConf(SQLConf.USE_JACKSON_STREAMING_API, useStreaming) - val temp = Utils.createTempDir().getPath - - val df = ctx.read.schema(schemaWithSimpleMap).json(mapType1) - df.write.mode("overwrite").parquet(temp) - // order of MapType is not defined - assert(ctx.read.parquet(temp).count() == 5) - - val df2 = ctx.read.json(corruptRecords) - df2.write.mode("overwrite").parquet(temp) - checkAnswer(ctx.read.parquet(temp), df2.collect()) - } + try { + val temp = Utils.createTempDir().getPath + + val df = ctx.read.schema(schemaWithSimpleMap).json(mapType1) + df.write.mode("overwrite").parquet(temp) + // order of MapType is not defined + assert(ctx.read.parquet(temp).count() == 5) + + val df2 = ctx.read.json(corruptRecords) + df2.write.mode("overwrite").parquet(temp) + checkAnswer(ctx.read.parquet(temp), df2.collect()) } finally { - ctx.setConf(SQLConf.USE_JACKSON_STREAMING_API, useStreaming) ctx.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, oldColumnNameOfCorruptRecord) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org