Repository: spark
Updated Branches:
  refs/heads/master 9914b1b2c -> 45d798c32


[SPARK-8278] Remove non-streaming JSON reader.

Author: Reynold Xin <r...@databricks.com>

Closes #7501 from rxin/jsonrdd and squashes the following commits:

767ec55 [Reynold Xin] More Mima
51f456e [Reynold Xin] Mima exclude.
789cb80 [Reynold Xin] Fixed compilation error.
b4cf50d [Reynold Xin] [SPARK-8278] Remove non-streaming JSON reader.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/45d798c3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/45d798c3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/45d798c3

Branch: refs/heads/master
Commit: 45d798c323ffe32bc2eba4dbd271c4572f5a30cf
Parents: 9914b1b
Author: Reynold Xin <r...@databricks.com>
Authored: Sat Jul 18 20:27:55 2015 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Sat Jul 18 20:27:55 2015 -0700

----------------------------------------------------------------------
 project/MimaExcludes.scala                      |   3 +
 .../org/apache/spark/sql/DataFrameReader.scala  |  15 +-
 .../scala/org/apache/spark/sql/SQLConf.scala    |   5 -
 .../apache/spark/sql/json/JSONRelation.scala    |  48 +-
 .../org/apache/spark/sql/json/JsonRDD.scala     | 449 -------------------
 .../org/apache/spark/sql/json/JsonSuite.scala   |  27 +-
 6 files changed, 29 insertions(+), 518 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/45d798c3/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 4e4e810..36417f5 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -64,6 +64,9 @@ object MimaExcludes {
             excludePackage("org.apache.spark.sql.execution"),
             // Parquet support is considered private.
             excludePackage("org.apache.spark.sql.parquet"),
+            // The old JSON RDD is removed in favor of streaming Jackson
+            
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.json.JsonRDD$"),
+            
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.json.JsonRDD"),
             // local function inside a method
             ProblemFilters.exclude[MissingMethodProblem](
               
"org.apache.spark.sql.SQLContext.org$apache$spark$sql$SQLContext$$needsConversion$1")

http://git-wip-us.apache.org/repos/asf/spark/blob/45d798c3/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 9ad6e21..9b23df4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -27,7 +27,7 @@ import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, 
JDBCRelation}
-import org.apache.spark.sql.json.{JsonRDD, JSONRelation}
+import org.apache.spark.sql.json.JSONRelation
 import org.apache.spark.sql.parquet.ParquetRelation2
 import org.apache.spark.sql.sources.{LogicalRelation, ResolvedDataSource}
 import org.apache.spark.sql.types.StructType
@@ -236,17 +236,8 @@ class DataFrameReader private[sql](sqlContext: SQLContext) 
{
    */
   def json(jsonRDD: RDD[String]): DataFrame = {
     val samplingRatio = extraOptions.getOrElse("samplingRatio", "1.0").toDouble
-    if (sqlContext.conf.useJacksonStreamingAPI) {
-      sqlContext.baseRelationToDataFrame(
-        new JSONRelation(() => jsonRDD, None, samplingRatio, 
userSpecifiedSchema)(sqlContext))
-    } else {
-      val columnNameOfCorruptJsonRecord = 
sqlContext.conf.columnNameOfCorruptRecord
-      val appliedSchema = userSpecifiedSchema.getOrElse(
-        JsonRDD.nullTypeToStringType(
-          JsonRDD.inferSchema(jsonRDD, 1.0, columnNameOfCorruptJsonRecord)))
-      val rowRDD = JsonRDD.jsonStringToRow(jsonRDD, appliedSchema, 
columnNameOfCorruptJsonRecord)
-      sqlContext.internalCreateDataFrame(rowRDD, appliedSchema)
-    }
+    sqlContext.baseRelationToDataFrame(
+      new JSONRelation(() => jsonRDD, None, samplingRatio, 
userSpecifiedSchema)(sqlContext))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/45d798c3/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 2c2f7c3..84d3271 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -401,9 +401,6 @@ private[spark] object SQLConf {
     "spark.sql.useSerializer2",
     defaultValue = Some(true), isPublic = false)
 
-  val USE_JACKSON_STREAMING_API = 
booleanConf("spark.sql.json.useJacksonStreamingAPI",
-    defaultValue = Some(true), doc = "<TODO>")
-
   object Deprecated {
     val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
   }
@@ -473,8 +470,6 @@ private[sql] class SQLConf extends Serializable with 
CatalystConf {
 
   private[spark] def useSqlSerializer2: Boolean = getConf(USE_SQL_SERIALIZER2)
 
-  private[spark] def useJacksonStreamingAPI: Boolean = 
getConf(USE_JACKSON_STREAMING_API)
-
   private[spark] def autoBroadcastJoinThreshold: Int = 
getConf(AUTO_BROADCASTJOIN_THRESHOLD)
 
   private[spark] def defaultSizeInBytes: Long =

http://git-wip-us.apache.org/repos/asf/spark/blob/45d798c3/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
index 2361d3b..25802d0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
@@ -157,51 +157,27 @@ private[sql] class JSONRelation(
     }
   }
 
-  private val useJacksonStreamingAPI: Boolean = 
sqlContext.conf.useJacksonStreamingAPI
-
   override val needConversion: Boolean = false
 
   override lazy val schema = userSpecifiedSchema.getOrElse {
-    if (useJacksonStreamingAPI) {
-      InferSchema(
-        baseRDD(),
-        samplingRatio,
-        sqlContext.conf.columnNameOfCorruptRecord)
-    } else {
-      JsonRDD.nullTypeToStringType(
-        JsonRDD.inferSchema(
-          baseRDD(),
-          samplingRatio,
-          sqlContext.conf.columnNameOfCorruptRecord))
-    }
+    InferSchema(
+      baseRDD(),
+      samplingRatio,
+      sqlContext.conf.columnNameOfCorruptRecord)
   }
 
   override def buildScan(): RDD[Row] = {
-    if (useJacksonStreamingAPI) {
-      JacksonParser(
-        baseRDD(),
-        schema,
-        sqlContext.conf.columnNameOfCorruptRecord).map(_.asInstanceOf[Row])
-    } else {
-      JsonRDD.jsonStringToRow(
-        baseRDD(),
-        schema,
-        sqlContext.conf.columnNameOfCorruptRecord).map(_.asInstanceOf[Row])
-    }
+    JacksonParser(
+      baseRDD(),
+      schema,
+      sqlContext.conf.columnNameOfCorruptRecord).map(_.asInstanceOf[Row])
   }
 
   override def buildScan(requiredColumns: Seq[Attribute], filters: 
Seq[Expression]): RDD[Row] = {
-    if (useJacksonStreamingAPI) {
-      JacksonParser(
-        baseRDD(),
-        StructType.fromAttributes(requiredColumns),
-        sqlContext.conf.columnNameOfCorruptRecord).map(_.asInstanceOf[Row])
-    } else {
-      JsonRDD.jsonStringToRow(
-        baseRDD(),
-        StructType.fromAttributes(requiredColumns),
-        sqlContext.conf.columnNameOfCorruptRecord).map(_.asInstanceOf[Row])
-    }
+    JacksonParser(
+      baseRDD(),
+      StructType.fromAttributes(requiredColumns),
+      sqlContext.conf.columnNameOfCorruptRecord).map(_.asInstanceOf[Row])
   }
 
   override def insert(data: DataFrame, overwrite: Boolean): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/45d798c3/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
deleted file mode 100644
index b392a51..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
+++ /dev/null
@@ -1,449 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.json
-
-import scala.collection.Map
-import scala.collection.convert.Wrappers.{JListWrapper, JMapWrapper}
-
-import com.fasterxml.jackson.core.JsonProcessingException
-import com.fasterxml.jackson.databind.ObjectMapper
-
-import org.apache.spark.Logging
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.ScalaReflection
-import org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.UTF8String
-
-
-private[sql] object JsonRDD extends Logging {
-
-  private[sql] def jsonStringToRow(
-      json: RDD[String],
-      schema: StructType,
-      columnNameOfCorruptRecords: String): RDD[InternalRow] = {
-    parseJson(json, columnNameOfCorruptRecords).map(parsed => asRow(parsed, 
schema))
-  }
-
-  private[sql] def inferSchema(
-      json: RDD[String],
-      samplingRatio: Double = 1.0,
-      columnNameOfCorruptRecords: String): StructType = {
-    require(samplingRatio > 0, s"samplingRatio ($samplingRatio) should be 
greater than 0")
-    val schemaData = if (samplingRatio > 0.99) json else json.sample(false, 
samplingRatio, 1)
-    val allKeys =
-      if (schemaData.isEmpty()) {
-        Set.empty[(String, DataType)]
-      } else {
-        parseJson(schemaData, 
columnNameOfCorruptRecords).map(allKeysWithValueTypes).reduce(_ ++ _)
-      }
-    createSchema(allKeys)
-  }
-
-  private def createSchema(allKeys: Set[(String, DataType)]): StructType = {
-    // Resolve type conflicts
-    val resolved = allKeys.groupBy {
-      case (key, dataType) => key
-    }.map {
-      // Now, keys and types are organized in the format of
-      // key -> Set(type1, type2, ...).
-      case (key, typeSet) => {
-        val fieldName = key.substring(1, key.length - 1).split("`.`").toSeq
-        val dataType = typeSet.map {
-          case (_, dataType) => dataType
-        }.reduce((type1: DataType, type2: DataType) => compatibleType(type1, 
type2))
-
-        (fieldName, dataType)
-      }
-    }
-
-    def makeStruct(values: Seq[Seq[String]], prefix: Seq[String]): StructType 
= {
-      val (topLevel, structLike) = values.partition(_.size == 1)
-
-      val topLevelFields = topLevel.filter {
-        name => resolved.get(prefix ++ name).get match {
-          case ArrayType(elementType, _) => {
-            def hasInnerStruct(t: DataType): Boolean = t match {
-              case s: StructType => true
-              case ArrayType(t1, _) => hasInnerStruct(t1)
-              case o => false
-            }
-
-            // Check if this array has inner struct.
-            !hasInnerStruct(elementType)
-          }
-          case struct: StructType => false
-          case _ => true
-        }
-      }.map {
-        a => StructField(a.head, resolved.get(prefix ++ a).get, nullable = 
true)
-      }
-      val topLevelFieldNameSet = topLevelFields.map(_.name)
-
-      val structFields: Seq[StructField] = structLike.groupBy(_(0)).filter {
-        case (name, _) => !topLevelFieldNameSet.contains(name)
-      }.map {
-        case (name, fields) => {
-          val nestedFields = fields.map(_.tail)
-          val structType = makeStruct(nestedFields, prefix :+ name)
-          val dataType = resolved.get(prefix :+ name).get
-          dataType match {
-            case array: ArrayType =>
-              // The pattern of this array is 
ArrayType(...(ArrayType(StructType))).
-              // Since the inner struct of array is a placeholder 
(StructType(Nil)),
-              // we need to replace this placeholder with the actual 
StructType (structType).
-              def getActualArrayType(
-                  innerStruct: StructType,
-                  currentArray: ArrayType): ArrayType = currentArray match {
-                case ArrayType(s: StructType, containsNull) =>
-                  ArrayType(innerStruct, containsNull)
-                case ArrayType(a: ArrayType, containsNull) =>
-                  ArrayType(getActualArrayType(innerStruct, a), containsNull)
-              }
-              Some(StructField(name, getActualArrayType(structType, array), 
nullable = true))
-            case struct: StructType => Some(StructField(name, structType, 
nullable = true))
-            // dataType is StringType means that we have resolved type 
conflicts involving
-            // primitive types and complex types. So, the type of name has 
been relaxed to
-            // StringType. Also, this field should have already been put in 
topLevelFields.
-            case StringType => None
-          }
-        }
-      }.flatMap(field => field).toSeq
-
-      StructType((topLevelFields ++ structFields).sortBy(_.name))
-    }
-
-    makeStruct(resolved.keySet.toSeq, Nil)
-  }
-
-  private[sql] def nullTypeToStringType(struct: StructType): StructType = {
-    val fields = struct.fields.map {
-      case StructField(fieldName, dataType, nullable, _) => {
-        val newType = dataType match {
-          case NullType => StringType
-          case ArrayType(NullType, containsNull) => ArrayType(StringType, 
containsNull)
-          case ArrayType(struct: StructType, containsNull) =>
-            ArrayType(nullTypeToStringType(struct), containsNull)
-          case struct: StructType => nullTypeToStringType(struct)
-          case other: DataType => other
-        }
-        StructField(fieldName, newType, nullable)
-      }
-    }
-
-    StructType(fields)
-  }
-
-  /**
-   * Returns the most general data type for two given data types.
-   */
-  private[json] def compatibleType(t1: DataType, t2: DataType): DataType = {
-    HiveTypeCoercion.findTightestCommonTypeOfTwo(t1, t2) match {
-      case Some(commonType) => commonType
-      case None =>
-        // t1 or t2 is a StructType, ArrayType, or an unexpected type.
-        (t1, t2) match {
-          case (other: DataType, NullType) => other
-          case (NullType, other: DataType) => other
-          case (StructType(fields1), StructType(fields2)) => {
-            val newFields = (fields1 ++ fields2).groupBy(field => 
field.name).map {
-              case (name, fieldTypes) => {
-                val dataType = fieldTypes.map(field => field.dataType).reduce(
-                  (type1: DataType, type2: DataType) => compatibleType(type1, 
type2))
-                StructField(name, dataType, true)
-              }
-            }
-            StructType(newFields.toSeq.sortBy(_.name))
-          }
-          case (ArrayType(elementType1, containsNull1), 
ArrayType(elementType2, containsNull2)) =>
-            ArrayType(compatibleType(elementType1, elementType2), 
containsNull1 || containsNull2)
-          // TODO: We should use JsonObjectStringType to mark that values of 
field will be
-          // strings and every string is a Json object.
-          case (_, _) => StringType
-        }
-    }
-  }
-
-  private def typeOfPrimitiveValue: PartialFunction[Any, DataType] = {
-    // For Integer values, use LongType by default.
-    val useLongType: PartialFunction[Any, DataType] = {
-      case value: IntegerType.InternalType => LongType
-    }
-
-    useLongType orElse ScalaReflection.typeOfObject orElse {
-      // Since we do not have a data type backed by BigInteger,
-      // when we see a Java BigInteger, we use DecimalType.
-      case value: java.math.BigInteger => DecimalType.Unlimited
-      // DecimalType's JVMType is scala BigDecimal.
-      case value: java.math.BigDecimal => DecimalType.Unlimited
-      // Unexpected data type.
-      case _ => StringType
-    }
-  }
-
-  /**
-   * Returns the element type of an JSON array. We go through all elements of 
this array
-   * to detect any possible type conflict. We use [[compatibleType]] to resolve
-   * type conflicts.
-   */
-  private def typeOfArray(l: Seq[Any]): ArrayType = {
-    val elements = l.flatMap(v => Option(v))
-    if (elements.isEmpty) {
-      // If this JSON array is empty, we use NullType as a placeholder.
-      // If this array is not empty in other JSON objects, we can resolve
-      // the type after we have passed through all JSON objects.
-      ArrayType(NullType, containsNull = true)
-    } else {
-      val elementType = elements.map {
-        e => e match {
-          case map: Map[_, _] => StructType(Nil)
-          // We have an array of arrays. If those element arrays do not have 
the same
-          // element types, we will return ArrayType[StringType].
-          case seq: Seq[_] => typeOfArray(seq)
-          case value => typeOfPrimitiveValue(value)
-        }
-      }.reduce((type1: DataType, type2: DataType) => compatibleType(type1, 
type2))
-
-      ArrayType(elementType, containsNull = true)
-    }
-  }
-
-  /**
-   * Figures out all key names and data types of values from a parsed JSON 
object
-   * (in the format of Map[Stirng, Any]). When the value of a key is an JSON 
object, we
-   * only use a placeholder (StructType(Nil)) to mark that it should be a 
struct
-   * instead of getting all fields of this struct because a field does not 
appear
-   * in this JSON object can appear in other JSON objects.
-   */
-  private def allKeysWithValueTypes(m: Map[String, Any]): Set[(String, 
DataType)] = {
-    val keyValuePairs = m.map {
-      // Quote the key with backticks to handle cases which have dots
-      // in the field name.
-      case (key, value) => (s"`$key`", value)
-    }.toSet
-    keyValuePairs.flatMap {
-      case (key: String, struct: Map[_, _]) => {
-        // The value associated with the key is an JSON object.
-        allKeysWithValueTypes(struct.asInstanceOf[Map[String, Any]]).map {
-          case (k, dataType) => (s"$key.$k", dataType)
-        } ++ Set((key, StructType(Nil)))
-      }
-      case (key: String, array: Seq[_]) => {
-        // The value associated with the key is an array.
-        // Handle inner structs of an array.
-        def buildKeyPathForInnerStructs(v: Any, t: DataType): Seq[(String, 
DataType)] = t match {
-          case ArrayType(e: StructType, _) => {
-            // The elements of this arrays are structs.
-            v.asInstanceOf[Seq[Map[String, Any]]].flatMap(Option(_)).flatMap {
-              element => allKeysWithValueTypes(element)
-            }.map {
-              case (k, t) => (s"$key.$k", t)
-            }
-          }
-          case ArrayType(t1, _) =>
-            v.asInstanceOf[Seq[Any]].flatMap(Option(_)).flatMap {
-              element => buildKeyPathForInnerStructs(element, t1)
-            }
-          case other => Nil
-        }
-        val elementType = typeOfArray(array)
-        buildKeyPathForInnerStructs(array, elementType) :+ (key, elementType)
-      }
-      // we couldn't tell what the type is if the value is null or empty string
-      case (key: String, value) if value == "" || value == null => (key, 
NullType) :: Nil
-      case (key: String, value) => (key, typeOfPrimitiveValue(value)) :: Nil
-    }
-  }
-
-  /**
-   * Converts a Java Map/List to a Scala Map/Seq.
-   * We do not use Jackson's scala module at here because
-   * DefaultScalaModule in jackson-module-scala will make
-   * the parsing very slow.
-   */
-  private def scalafy(obj: Any): Any = obj match {
-    case map: java.util.Map[_, _] =>
-      // .map(identity) is used as a workaround of non-serializable Map
-      // generated by .mapValues.
-      // This issue is documented at 
https://issues.scala-lang.org/browse/SI-7005
-      JMapWrapper(map).mapValues(scalafy).map(identity)
-    case list: java.util.List[_] =>
-      JListWrapper(list).map(scalafy)
-    case atom => atom
-  }
-
-  private def parseJson(
-      json: RDD[String],
-      columnNameOfCorruptRecords: String): RDD[Map[String, Any]] = {
-    // According to [Jackson-72: https://jira.codehaus.org/browse/JACKSON-72],
-    // ObjectMapper will not return BigDecimal when
-    // "DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS" is disabled
-    // (see NumberDeserializer.deserialize for the logic).
-    // But, we do not want to enable this feature because it will use 
BigDecimal
-    // for every float number, which will be slow.
-    // So, right now, we will have Infinity for those BigDecimal number.
-    // TODO: Support BigDecimal.
-    json.mapPartitions(iter => {
-      // When there is a key appearing multiple times (a duplicate key),
-      // the ObjectMapper will take the last value associated with this 
duplicate key.
-      // For example: for {"key": 1, "key":2}, we will get "key"->2.
-      val mapper = new ObjectMapper()
-      iter.flatMap { record =>
-        try {
-          val parsed = mapper.readValue(record, classOf[Object]) match {
-            case map: java.util.Map[_, _] => 
scalafy(map).asInstanceOf[Map[String, Any]] :: Nil
-            case list: java.util.List[_] => 
scalafy(list).asInstanceOf[Seq[Map[String, Any]]]
-            case _ =>
-              sys.error(
-                s"Failed to parse record $record. Please make sure that each 
line of the file " +
-                "(or each string in the RDD) is a valid JSON object or an 
array of JSON objects.")
-          }
-
-          parsed
-        } catch {
-          case e: JsonProcessingException =>
-            Map(columnNameOfCorruptRecords -> UTF8String.fromString(record)) 
:: Nil
-        }
-      }
-    })
-  }
-
-  private def toLong(value: Any): Long = {
-    value match {
-      case value: java.lang.Integer => value.asInstanceOf[Int].toLong
-      case value: java.lang.Long => value.asInstanceOf[Long]
-    }
-  }
-
-  private def toDouble(value: Any): Double = {
-    value match {
-      case value: java.lang.Integer => value.asInstanceOf[Int].toDouble
-      case value: java.lang.Long => value.asInstanceOf[Long].toDouble
-      case value: java.lang.Double => value.asInstanceOf[Double]
-    }
-  }
-
-  private def toDecimal(value: Any): Decimal = {
-    value match {
-      case value: java.lang.Integer => Decimal(value)
-      case value: java.lang.Long => Decimal(value)
-      case value: java.math.BigInteger => Decimal(new 
java.math.BigDecimal(value))
-      case value: java.lang.Double => Decimal(value)
-      case value: java.math.BigDecimal => Decimal(value)
-    }
-  }
-
-  private def toJsonArrayString(seq: Seq[Any]): String = {
-    val builder = new StringBuilder
-    builder.append("[")
-    var count = 0
-    seq.foreach {
-      element =>
-        if (count > 0) builder.append(",")
-        count += 1
-        builder.append(toString(element))
-    }
-    builder.append("]")
-
-    builder.toString()
-  }
-
-  private def toJsonObjectString(map: Map[String, Any]): String = {
-    val builder = new StringBuilder
-    builder.append("{")
-    var count = 0
-    map.foreach {
-      case (key, value) =>
-        if (count > 0) builder.append(",")
-        count += 1
-        val stringValue = if (value.isInstanceOf[String]) s"""\"$value\"""" 
else toString(value)
-        builder.append(s"""\"${key}\":${stringValue}""")
-    }
-    builder.append("}")
-
-    builder.toString()
-  }
-
-  private def toString(value: Any): String = {
-    value match {
-      case value: Map[_, _] => 
toJsonObjectString(value.asInstanceOf[Map[String, Any]])
-      case value: Seq[_] => toJsonArrayString(value)
-      case value => Option(value).map(_.toString).orNull
-    }
-  }
-
-  private def toDate(value: Any): Int = {
-    value match {
-      // only support string as date
-      case value: java.lang.String =>
-        DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(value).getTime)
-      case value: java.sql.Date => DateTimeUtils.fromJavaDate(value)
-    }
-  }
-
-  private def toTimestamp(value: Any): Long = {
-    value match {
-      case value: java.lang.Integer => value.asInstanceOf[Int].toLong * 1000L
-      case value: java.lang.Long => value * 1000L
-      case value: java.lang.String => 
DateTimeUtils.stringToTime(value).getTime * 1000L
-    }
-  }
-
-  private[json] def enforceCorrectType(value: Any, desiredType: DataType): Any 
= {
-    if (value == null) {
-      null
-    } else {
-      desiredType match {
-        case StringType => UTF8String.fromString(toString(value))
-        case _ if value == null || value == "" => null // guard the non string 
type
-        case IntegerType => value.asInstanceOf[IntegerType.InternalType]
-        case LongType => toLong(value)
-        case DoubleType => toDouble(value)
-        case DecimalType() => toDecimal(value)
-        case BooleanType => value.asInstanceOf[BooleanType.InternalType]
-        case NullType => null
-        case ArrayType(elementType, _) =>
-          value.asInstanceOf[Seq[Any]].map(enforceCorrectType(_, elementType))
-        case MapType(StringType, valueType, _) =>
-          val map = value.asInstanceOf[Map[String, Any]]
-          map.map {
-            case (k, v) =>
-              (UTF8String.fromString(k), enforceCorrectType(v, valueType))
-          }.map(identity)
-        case struct: StructType => asRow(value.asInstanceOf[Map[String, Any]], 
struct)
-        case DateType => toDate(value)
-        case TimestampType => toTimestamp(value)
-      }
-    }
-  }
-
-  private def asRow(json: Map[String, Any], schema: StructType): InternalRow = 
{
-    // TODO: Reuse the row instead of creating a new one for every record.
-    val row = new GenericMutableRow(schema.fields.length)
-    schema.fields.zipWithIndex.foreach {
-      case (StructField(name, dataType, _, _), i) =>
-        row.update(i, json.get(name).flatMap(v => Option(v)).map(
-          enforceCorrectType(_, dataType)).orNull)
-    }
-
-    row
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/45d798c3/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
index 8204a58..3475f9d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -1079,28 +1079,23 @@ class JsonSuite extends QueryTest with TestJsonData {
   }
 
   test("SPARK-7565 MapType in JsonRDD") {
-    val useStreaming = ctx.conf.useJacksonStreamingAPI
     val oldColumnNameOfCorruptRecord = ctx.conf.columnNameOfCorruptRecord
     ctx.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, "_unparsed")
 
     val schemaWithSimpleMap = StructType(
       StructField("map", MapType(StringType, IntegerType, true), false) :: Nil)
-    try{
-      for (useStreaming <- List(true, false)) {
-        ctx.setConf(SQLConf.USE_JACKSON_STREAMING_API, useStreaming)
-        val temp = Utils.createTempDir().getPath
-
-        val df = ctx.read.schema(schemaWithSimpleMap).json(mapType1)
-        df.write.mode("overwrite").parquet(temp)
-        // order of MapType is not defined
-        assert(ctx.read.parquet(temp).count() == 5)
-
-        val df2 = ctx.read.json(corruptRecords)
-        df2.write.mode("overwrite").parquet(temp)
-        checkAnswer(ctx.read.parquet(temp), df2.collect())
-      }
+    try {
+      val temp = Utils.createTempDir().getPath
+
+      val df = ctx.read.schema(schemaWithSimpleMap).json(mapType1)
+      df.write.mode("overwrite").parquet(temp)
+      // order of MapType is not defined
+      assert(ctx.read.parquet(temp).count() == 5)
+
+      val df2 = ctx.read.json(corruptRecords)
+      df2.write.mode("overwrite").parquet(temp)
+      checkAnswer(ctx.read.parquet(temp), df2.collect())
     } finally {
-      ctx.setConf(SQLConf.USE_JACKSON_STREAMING_API, useStreaming)
       ctx.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, 
oldColumnNameOfCorruptRecord)
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to