Repository: spark
Updated Branches:
  refs/heads/branch-1.6 053c63ecf -> a0f9cd77a


[SPARK-11745][SQL] Enable more JSON parsing options

This patch adds the following options to the JSON data source, for dealing with 
non-standard JSON files:
* `allowComments` (default `false`): ignores Java/C++ style comment in JSON 
records
* `allowUnquotedFieldNames` (default `false`): allows unquoted JSON field names
* `allowSingleQuotes` (default `true`): allows single quotes in addition to 
double quotes
* `allowNumericLeadingZeros` (default `false`): allows leading zeros in numbers 
(e.g. 00012)

To avoid passing a lot of options throughout the json package, I introduced a 
new JSONOptions case class to define all JSON config options.

Also updated documentation to explain these options.

Scala

![screen shot 2015-11-15 at 6 12 12 
pm](https://cloud.githubusercontent.com/assets/323388/11172965/e3ace6ec-8bc4-11e5-805e-2d78f80d0ed6.png)

Python

![screen shot 2015-11-15 at 6 11 28 
pm](https://cloud.githubusercontent.com/assets/323388/11172964/e23ed6ee-8bc4-11e5-8216-312f5983acd5.png)

Author: Reynold Xin <r...@databricks.com>

Closes #9724 from rxin/SPARK-11745.

(cherry picked from commit 42de5253f327bd7ee258b0efb5024f3847fa3b51)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a0f9cd77
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a0f9cd77
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a0f9cd77

Branch: refs/heads/branch-1.6
Commit: a0f9cd77a73ae911d382ac8ddd52614800663704
Parents: 053c63e
Author: Reynold Xin <r...@databricks.com>
Authored: Mon Nov 16 00:06:14 2015 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Mon Nov 16 00:06:21 2015 -0800

----------------------------------------------------------------------
 python/pyspark/sql/readwriter.py                |  10 ++
 .../org/apache/spark/sql/DataFrameReader.scala  |  22 ++--
 .../apache/spark/sql/execution/SparkPlan.scala  |  17 +--
 .../datasources/json/InferSchema.scala          |  34 +++---
 .../datasources/json/JSONOptions.scala          |  64 +++++++++++
 .../datasources/json/JSONRelation.scala         |  20 ++--
 .../datasources/json/JacksonParser.scala        |  82 ++++++-------
 .../json/JsonParsingOptionsSuite.scala          | 114 +++++++++++++++++++
 .../execution/datasources/json/JsonSuite.scala  |  29 +++--
 9 files changed, 286 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a0f9cd77/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 927f407..7b8ddb9 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -153,6 +153,16 @@ class DataFrameReader(object):
                      or RDD of Strings storing JSON objects.
         :param schema: an optional :class:`StructType` for the input schema.
 
+        You can set the following JSON-specific options to deal with 
non-standard JSON files:
+            * ``primitivesAsString`` (default ``false``): infers all primitive 
values as a string \
+                type
+            * ``allowComments`` (default ``false``): ignores Java/C++ style 
comment in JSON records
+            * ``allowUnquotedFieldNames`` (default ``false``): allows unquoted 
JSON field names
+            * ``allowSingleQuotes`` (default ``true``): allows single quotes 
in addition to double \
+                quotes
+            * ``allowNumericLeadingZeros`` (default ``false``): allows leading 
zeros in numbers \
+                (e.g. 00012)
+
         >>> df1 = sqlContext.read.json('python/test_support/sql/people.json')
         >>> df1.dtypes
         [('age', 'bigint'), ('name', 'string')]

http://git-wip-us.apache.org/repos/asf/spark/blob/a0f9cd77/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 6a194a4..5872fbd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -29,7 +29,7 @@ import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.execution.datasources.jdbc.{JDBCPartition, 
JDBCPartitioningInfo, JDBCRelation}
-import org.apache.spark.sql.execution.datasources.json.JSONRelation
+import org.apache.spark.sql.execution.datasources.json.{JSONOptions, 
JSONRelation}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
 import org.apache.spark.sql.execution.datasources.{LogicalRelation, 
ResolvedDataSource}
 import org.apache.spark.sql.types.StructType
@@ -227,6 +227,15 @@ class DataFrameReader private[sql](sqlContext: SQLContext) 
extends Logging {
    * This function goes through the input once to determine the input schema. 
If you know the
    * schema in advance, use the version that specifies the schema to avoid the 
extra scan.
    *
+   * You can set the following JSON-specific options to deal with non-standard 
JSON files:
+   * <li>`primitivesAsString` (default `false`): infers all primitive values 
as a string type</li>
+   * <li>`allowComments` (default `false`): ignores Java/C++ style comment in 
JSON records</li>
+   * <li>`allowUnquotedFieldNames` (default `false`): allows unquoted JSON 
field names</li>
+   * <li>`allowSingleQuotes` (default `true`): allows single quotes in 
addition to double quotes
+   * </li>
+   * <li>`allowNumericLeadingZeros` (default `false`): allows leading zeros in 
numbers
+   * (e.g. 00012)</li>
+   *
    * @param path input path
    * @since 1.4.0
    */
@@ -255,16 +264,13 @@ class DataFrameReader private[sql](sqlContext: 
SQLContext) extends Logging {
    * @since 1.4.0
    */
   def json(jsonRDD: RDD[String]): DataFrame = {
-    val samplingRatio = extraOptions.getOrElse("samplingRatio", "1.0").toDouble
-    val primitivesAsString = extraOptions.getOrElse("primitivesAsString", 
"false").toBoolean
     sqlContext.baseRelationToDataFrame(
       new JSONRelation(
         Some(jsonRDD),
-        samplingRatio,
-        primitivesAsString,
-        userSpecifiedSchema,
-        None,
-        None)(sqlContext)
+        maybeDataSchema = userSpecifiedSchema,
+        maybePartitionSpec = None,
+        userDefinedPartitionColumns = None,
+        parameters = extraOptions.toMap)(sqlContext)
     )
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a0f9cd77/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 1b83300..534a3bc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -221,22 +221,6 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
 
   private[this] def isTesting: Boolean = sys.props.contains("spark.testing")
 
-  protected def newProjection(
-      expressions: Seq[Expression], inputSchema: Seq[Attribute]): Projection = 
{
-    log.debug(s"Creating Projection: $expressions, inputSchema: $inputSchema")
-    try {
-      GenerateProjection.generate(expressions, inputSchema)
-    } catch {
-      case e: Exception =>
-        if (isTesting) {
-          throw e
-        } else {
-          log.error("Failed to generate projection, fallback to interpret", e)
-          new InterpretedProjection(expressions, inputSchema)
-        }
-    }
-  }
-
   protected def newMutableProjection(
       expressions: Seq[Expression], inputSchema: Seq[Attribute]): () => 
MutableProjection = {
     log.debug(s"Creating MutableProj: $expressions, inputSchema: $inputSchema")
@@ -282,6 +266,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
         }
     }
   }
+
   /**
    * Creates a row ordering for the given schema, in natural ascending order.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/a0f9cd77/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
index b9914c5..922fd5b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
@@ -25,33 +25,36 @@ import 
org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
-private[sql] object InferSchema {
+
+private[json] object InferSchema {
+
   /**
    * Infer the type of a collection of json records in three stages:
    *   1. Infer the type of each record
    *   2. Merge types by choosing the lowest type necessary to cover equal keys
    *   3. Replace any remaining null fields with string, the top type
    */
-  def apply(
+  def infer(
       json: RDD[String],
-      samplingRatio: Double = 1.0,
       columnNameOfCorruptRecords: String,
-      primitivesAsString: Boolean = false): StructType = {
-    require(samplingRatio > 0, s"samplingRatio ($samplingRatio) should be 
greater than 0")
-    val schemaData = if (samplingRatio > 0.99) {
+      configOptions: JSONOptions): StructType = {
+    require(configOptions.samplingRatio > 0,
+      s"samplingRatio (${configOptions.samplingRatio}) should be greater than 
0")
+    val schemaData = if (configOptions.samplingRatio > 0.99) {
       json
     } else {
-      json.sample(withReplacement = false, samplingRatio, 1)
+      json.sample(withReplacement = false, configOptions.samplingRatio, 1)
     }
 
     // perform schema inference on each row and merge afterwards
     val rootType = schemaData.mapPartitions { iter =>
       val factory = new JsonFactory()
+      configOptions.setJacksonOptions(factory)
       iter.map { row =>
         try {
           Utils.tryWithResource(factory.createParser(row)) { parser =>
             parser.nextToken()
-            inferField(parser, primitivesAsString)
+            inferField(parser, configOptions)
           }
         } catch {
           case _: JsonParseException =>
@@ -71,14 +74,14 @@ private[sql] object InferSchema {
   /**
    * Infer the type of a json document from the parser's token stream
    */
-  private def inferField(parser: JsonParser, primitivesAsString: Boolean): 
DataType = {
+  private def inferField(parser: JsonParser, configOptions: JSONOptions): 
DataType = {
     import com.fasterxml.jackson.core.JsonToken._
     parser.getCurrentToken match {
       case null | VALUE_NULL => NullType
 
       case FIELD_NAME =>
         parser.nextToken()
-        inferField(parser, primitivesAsString)
+        inferField(parser, configOptions)
 
       case VALUE_STRING if parser.getTextLength < 1 =>
         // Zero length strings and nulls have special handling to deal
@@ -95,7 +98,7 @@ private[sql] object InferSchema {
         while (nextUntil(parser, END_OBJECT)) {
           builder += StructField(
             parser.getCurrentName,
-            inferField(parser, primitivesAsString),
+            inferField(parser, configOptions),
             nullable = true)
         }
 
@@ -107,14 +110,15 @@ private[sql] object InferSchema {
         // the type as we pass through all JSON objects.
         var elementType: DataType = NullType
         while (nextUntil(parser, END_ARRAY)) {
-          elementType = compatibleType(elementType, inferField(parser, 
primitivesAsString))
+          elementType = compatibleType(
+            elementType, inferField(parser, configOptions))
         }
 
         ArrayType(elementType)
 
-      case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT) if primitivesAsString => 
StringType
+      case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT) if 
configOptions.primitivesAsString => StringType
 
-      case (VALUE_TRUE | VALUE_FALSE) if primitivesAsString => StringType
+      case (VALUE_TRUE | VALUE_FALSE) if configOptions.primitivesAsString => 
StringType
 
       case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT =>
         import JsonParser.NumberType._
@@ -178,7 +182,7 @@ private[sql] object InferSchema {
   /**
    * Returns the most general data type for two given data types.
    */
-  private[json] def compatibleType(t1: DataType, t2: DataType): DataType = {
+  def compatibleType(t1: DataType, t2: DataType): DataType = {
     HiveTypeCoercion.findTightestCommonTypeOfTwo(t1, t2).getOrElse {
       // t1 or t2 is a StructType, ArrayType, or an unexpected type.
       (t1, t2) match {

http://git-wip-us.apache.org/repos/asf/spark/blob/a0f9cd77/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
new file mode 100644
index 0000000..c132ead
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.json
+
+import com.fasterxml.jackson.core.{JsonParser, JsonFactory}
+
+/**
+ * Options for the JSON data source.
+ *
+ * Most of these map directly to Jackson's internal options, specified in 
[[JsonParser.Feature]].
+ */
+case class JSONOptions(
+    samplingRatio: Double = 1.0,
+    primitivesAsString: Boolean = false,
+    allowComments: Boolean = false,
+    allowUnquotedFieldNames: Boolean = false,
+    allowSingleQuotes: Boolean = true,
+    allowNumericLeadingZeros: Boolean = false,
+    allowNonNumericNumbers: Boolean = false) {
+
+  /** Sets config options on a Jackson [[JsonFactory]]. */
+  def setJacksonOptions(factory: JsonFactory): Unit = {
+    factory.configure(JsonParser.Feature.ALLOW_COMMENTS, allowComments)
+    factory.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, 
allowUnquotedFieldNames)
+    factory.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, 
allowSingleQuotes)
+    factory.configure(JsonParser.Feature.ALLOW_NUMERIC_LEADING_ZEROS, 
allowNumericLeadingZeros)
+    factory.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, 
allowNonNumericNumbers)
+  }
+}
+
+
+object JSONOptions {
+  def createFromConfigMap(parameters: Map[String, String]): JSONOptions = 
JSONOptions(
+    samplingRatio =
+      parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0),
+    primitivesAsString =
+      parameters.get("primitivesAsString").map(_.toBoolean).getOrElse(false),
+    allowComments =
+      parameters.get("allowComments").map(_.toBoolean).getOrElse(false),
+    allowUnquotedFieldNames =
+      
parameters.get("allowUnquotedFieldNames").map(_.toBoolean).getOrElse(false),
+    allowSingleQuotes =
+      parameters.get("allowSingleQuotes").map(_.toBoolean).getOrElse(true),
+    allowNumericLeadingZeros =
+      
parameters.get("allowNumericLeadingZeros").map(_.toBoolean).getOrElse(false),
+    allowNonNumericNumbers =
+      parameters.get("allowNonNumericNumbers").map(_.toBoolean).getOrElse(true)
+  )
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a0f9cd77/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index dca638b..3e61ba3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -52,13 +52,9 @@ class DefaultSource extends HadoopFsRelationProvider with 
DataSourceRegister {
       dataSchema: Option[StructType],
       partitionColumns: Option[StructType],
       parameters: Map[String, String]): HadoopFsRelation = {
-    val samplingRatio = 
parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
-    val primitivesAsString = 
parameters.get("primitivesAsString").map(_.toBoolean).getOrElse(false)
 
     new JSONRelation(
       inputRDD = None,
-      samplingRatio = samplingRatio,
-      primitivesAsString = primitivesAsString,
       maybeDataSchema = dataSchema,
       maybePartitionSpec = None,
       userDefinedPartitionColumns = partitionColumns,
@@ -69,8 +65,6 @@ class DefaultSource extends HadoopFsRelationProvider with 
DataSourceRegister {
 
 private[sql] class JSONRelation(
     val inputRDD: Option[RDD[String]],
-    val samplingRatio: Double,
-    val primitivesAsString: Boolean,
     val maybeDataSchema: Option[StructType],
     val maybePartitionSpec: Option[PartitionSpec],
     override val userDefinedPartitionColumns: Option[StructType],
@@ -79,6 +73,8 @@ private[sql] class JSONRelation(
     (@transient val sqlContext: SQLContext)
   extends HadoopFsRelation(maybePartitionSpec, parameters) {
 
+  val options: JSONOptions = JSONOptions.createFromConfigMap(parameters)
+
   /** Constraints to be imposed on schema to be stored. */
   private def checkConstraints(schema: StructType): Unit = {
     if (schema.fieldNames.length != schema.fieldNames.distinct.length) {
@@ -109,17 +105,16 @@ private[sql] class JSONRelation(
       classOf[Text]).map(_._2.toString) // get the text line
   }
 
-  override lazy val dataSchema = {
+  override lazy val dataSchema: StructType = {
     val jsonSchema = maybeDataSchema.getOrElse {
       val files = cachedLeafStatuses().filterNot { status =>
         val name = status.getPath.getName
         name.startsWith("_") || name.startsWith(".")
       }.toArray
-      InferSchema(
+      InferSchema.infer(
         inputRDD.getOrElse(createBaseRdd(files)),
-        samplingRatio,
         sqlContext.conf.columnNameOfCorruptRecord,
-        primitivesAsString)
+        options)
     }
     checkConstraints(jsonSchema)
 
@@ -132,10 +127,11 @@ private[sql] class JSONRelation(
       inputPaths: Array[FileStatus],
       broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] 
= {
     val requiredDataSchema = StructType(requiredColumns.map(dataSchema(_)))
-    val rows = JacksonParser(
+    val rows = JacksonParser.parse(
       inputRDD.getOrElse(createBaseRdd(inputPaths)),
       requiredDataSchema,
-      sqlContext.conf.columnNameOfCorruptRecord)
+      sqlContext.conf.columnNameOfCorruptRecord,
+      options)
 
     rows.mapPartitions { iterator =>
       val unsafeProjection = UnsafeProjection.create(requiredDataSchema)

http://git-wip-us.apache.org/repos/asf/spark/blob/a0f9cd77/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
index 4f53eeb..bfa1405 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
@@ -18,11 +18,10 @@
 package org.apache.spark.sql.execution.datasources.json
 
 import java.io.ByteArrayOutputStream
+import scala.collection.mutable.ArrayBuffer
 
 import com.fasterxml.jackson.core._
 
-import scala.collection.mutable.ArrayBuffer
-
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
@@ -32,18 +31,23 @@ import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
 
-private[sql] object JacksonParser {
-  def apply(
-      json: RDD[String],
+object JacksonParser {
+
+  def parse(
+      input: RDD[String],
       schema: StructType,
-      columnNameOfCorruptRecords: String): RDD[InternalRow] = {
-    parseJson(json, schema, columnNameOfCorruptRecords)
+      columnNameOfCorruptRecords: String,
+      configOptions: JSONOptions): RDD[InternalRow] = {
+
+    input.mapPartitions { iter =>
+      parseJson(iter, schema, columnNameOfCorruptRecords, configOptions)
+    }
   }
 
   /**
    * Parse the current token (and related children) according to a desired 
schema
    */
-  private[sql] def convertField(
+  def convertField(
       factory: JsonFactory,
       parser: JsonParser,
       schema: DataType): Any = {
@@ -226,9 +230,10 @@ private[sql] object JacksonParser {
   }
 
   private def parseJson(
-      json: RDD[String],
+      input: Iterator[String],
       schema: StructType,
-      columnNameOfCorruptRecords: String): RDD[InternalRow] = {
+      columnNameOfCorruptRecords: String,
+      configOptions: JSONOptions): Iterator[InternalRow] = {
 
     def failedRecord(record: String): Seq[InternalRow] = {
       // create a row even if no corrupt record column is present
@@ -241,37 +246,36 @@ private[sql] object JacksonParser {
       Seq(row)
     }
 
-    json.mapPartitions { iter =>
-      val factory = new JsonFactory()
-
-      iter.flatMap { record =>
-        if (record.trim.isEmpty) {
-          Nil
-        } else {
-          try {
-            Utils.tryWithResource(factory.createParser(record)) { parser =>
-              parser.nextToken()
-
-              convertField(factory, parser, schema) match {
-                case null => failedRecord(record)
-                case row: InternalRow => row :: Nil
-                case array: ArrayData =>
-                  if (array.numElements() == 0) {
-                    Nil
-                  } else {
-                    array.toArray[InternalRow](schema)
-                  }
-                case _ =>
-                  sys.error(
-                    s"Failed to parse record $record. Please make sure that 
each line of " +
-                      "the file (or each string in the RDD) is a valid JSON 
object or " +
-                      "an array of JSON objects.")
-              }
+    val factory = new JsonFactory()
+    configOptions.setJacksonOptions(factory)
+
+    input.flatMap { record =>
+      if (record.trim.isEmpty) {
+        Nil
+      } else {
+        try {
+          Utils.tryWithResource(factory.createParser(record)) { parser =>
+            parser.nextToken()
+
+            convertField(factory, parser, schema) match {
+              case null => failedRecord(record)
+              case row: InternalRow => row :: Nil
+              case array: ArrayData =>
+                if (array.numElements() == 0) {
+                  Nil
+                } else {
+                  array.toArray[InternalRow](schema)
+                }
+              case _ =>
+                sys.error(
+                  s"Failed to parse record $record. Please make sure that each 
line of " +
+                    "the file (or each string in the RDD) is a valid JSON 
object or " +
+                    "an array of JSON objects.")
             }
-          } catch {
-            case _: JsonProcessingException =>
-              failedRecord(record)
           }
+        } catch {
+          case _: JsonProcessingException =>
+            failedRecord(record)
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/a0f9cd77/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
new file mode 100644
index 0000000..4cc0a3a
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.json
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.test.SharedSQLContext
+
+/**
+ * Test cases for various [[JSONOptions]].
+ */
+class JsonParsingOptionsSuite extends QueryTest with SharedSQLContext {
+
+  test("allowComments off") {
+    val str = """{'name': /* hello */ 'Reynold Xin'}"""
+    val rdd = sqlContext.sparkContext.parallelize(Seq(str))
+    val df = sqlContext.read.json(rdd)
+
+    assert(df.schema.head.name == "_corrupt_record")
+  }
+
+  test("allowComments on") {
+    val str = """{'name': /* hello */ 'Reynold Xin'}"""
+    val rdd = sqlContext.sparkContext.parallelize(Seq(str))
+    val df = sqlContext.read.option("allowComments", "true").json(rdd)
+
+    assert(df.schema.head.name == "name")
+    assert(df.first().getString(0) == "Reynold Xin")
+  }
+
+  test("allowSingleQuotes off") {
+    val str = """{'name': 'Reynold Xin'}"""
+    val rdd = sqlContext.sparkContext.parallelize(Seq(str))
+    val df = sqlContext.read.option("allowSingleQuotes", "false").json(rdd)
+
+    assert(df.schema.head.name == "_corrupt_record")
+  }
+
+  test("allowSingleQuotes on") {
+    val str = """{'name': 'Reynold Xin'}"""
+    val rdd = sqlContext.sparkContext.parallelize(Seq(str))
+    val df = sqlContext.read.json(rdd)
+
+    assert(df.schema.head.name == "name")
+    assert(df.first().getString(0) == "Reynold Xin")
+  }
+
+  test("allowUnquotedFieldNames off") {
+    val str = """{name: 'Reynold Xin'}"""
+    val rdd = sqlContext.sparkContext.parallelize(Seq(str))
+    val df = sqlContext.read.json(rdd)
+
+    assert(df.schema.head.name == "_corrupt_record")
+  }
+
+  test("allowUnquotedFieldNames on") {
+    val str = """{name: 'Reynold Xin'}"""
+    val rdd = sqlContext.sparkContext.parallelize(Seq(str))
+    val df = sqlContext.read.option("allowUnquotedFieldNames", 
"true").json(rdd)
+
+    assert(df.schema.head.name == "name")
+    assert(df.first().getString(0) == "Reynold Xin")
+  }
+
+  test("allowNumericLeadingZeros off") {
+    val str = """{"age": 0018}"""
+    val rdd = sqlContext.sparkContext.parallelize(Seq(str))
+    val df = sqlContext.read.json(rdd)
+
+    assert(df.schema.head.name == "_corrupt_record")
+  }
+
+  test("allowNumericLeadingZeros on") {
+    val str = """{"age": 0018}"""
+    val rdd = sqlContext.sparkContext.parallelize(Seq(str))
+    val df = sqlContext.read.option("allowNumericLeadingZeros", 
"true").json(rdd)
+
+    assert(df.schema.head.name == "age")
+    assert(df.first().getLong(0) == 18)
+  }
+
+  // The following two tests are not really working - need to look into 
Jackson's
+  // JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS.
+  ignore("allowNonNumericNumbers off") {
+    val str = """{"age": NaN}"""
+    val rdd = sqlContext.sparkContext.parallelize(Seq(str))
+    val df = sqlContext.read.json(rdd)
+
+    assert(df.schema.head.name == "_corrupt_record")
+  }
+
+  ignore("allowNonNumericNumbers on") {
+    val str = """{"age": NaN}"""
+    val rdd = sqlContext.sparkContext.parallelize(Seq(str))
+    val df = sqlContext.read.option("allowNonNumericNumbers", "true").json(rdd)
+
+    assert(df.schema.head.name == "age")
+    assert(df.first().getDouble(0).isNaN)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a0f9cd77/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 28b8f02..6042b11 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -588,7 +588,7 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
       relation.isInstanceOf[JSONRelation],
       "The DataFrame returned by jsonFile should be based on JSONRelation.")
     assert(relation.asInstanceOf[JSONRelation].paths === Array(path))
-    assert(relation.asInstanceOf[JSONRelation].samplingRatio === (0.49 +- 
0.001))
+    assert(relation.asInstanceOf[JSONRelation].options.samplingRatio === (0.49 
+- 0.001))
 
     val schema = StructType(StructField("a", LongType, true) :: Nil)
     val logicalRelation =
@@ -597,7 +597,7 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
     val relationWithSchema = 
logicalRelation.relation.asInstanceOf[JSONRelation]
     assert(relationWithSchema.paths === Array(path))
     assert(relationWithSchema.schema === schema)
-    assert(relationWithSchema.samplingRatio > 0.99)
+    assert(relationWithSchema.options.samplingRatio > 0.99)
   }
 
   test("Loading a JSON dataset from a text file") {
@@ -1165,31 +1165,28 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
   test("JSONRelation equality test") {
     val relation0 = new JSONRelation(
       Some(empty),
-      1.0,
-      false,
       Some(StructType(StructField("a", IntegerType, true) :: Nil)),
-      None, None)(sqlContext)
+      None,
+      None)(sqlContext)
     val logicalRelation0 = LogicalRelation(relation0)
     val relation1 = new JSONRelation(
       Some(singleRow),
-      1.0,
-      false,
       Some(StructType(StructField("a", IntegerType, true) :: Nil)),
-      None, None)(sqlContext)
+      None,
+      None)(sqlContext)
     val logicalRelation1 = LogicalRelation(relation1)
     val relation2 = new JSONRelation(
       Some(singleRow),
-      0.5,
-      false,
       Some(StructType(StructField("a", IntegerType, true) :: Nil)),
-      None, None)(sqlContext)
+      None,
+      None,
+      parameters = Map("samplingRatio" -> "0.5"))(sqlContext)
     val logicalRelation2 = LogicalRelation(relation2)
     val relation3 = new JSONRelation(
       Some(singleRow),
-      1.0,
-      false,
       Some(StructType(StructField("b", IntegerType, true) :: Nil)),
-      None, None)(sqlContext)
+      None,
+      None)(sqlContext)
     val logicalRelation3 = LogicalRelation(relation3)
 
     assert(relation0 !== relation1)
@@ -1232,7 +1229,7 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
 
   test("SPARK-6245 JsonRDD.inferSchema on empty RDD") {
     // This is really a test that it doesn't throw an exception
-    val emptySchema = InferSchema(empty, 1.0, "")
+    val emptySchema = InferSchema.infer(empty, "", JSONOptions())
     assert(StructType(Seq()) === emptySchema)
   }
 
@@ -1256,7 +1253,7 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
   }
 
   test("SPARK-8093 Erase empty structs") {
-    val emptySchema = InferSchema(emptyRecords, 1.0, "")
+    val emptySchema = InferSchema.infer(emptyRecords, "", JSONOptions())
     assert(StructType(Seq()) === emptySchema)
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to