Repository: spark Updated Branches: refs/heads/master 1faa1135a -> 1c7f0ab30
[SPARK-3339][SQL] Support for skipping json lines that fail to parse This PR aims to provide a way to skip/query corrupt JSON records. To do so, we introduce an internal column to hold corrupt records (the default name is `_corrupt_record`. This name can be changed by setting the value of `spark.sql.columnNameOfCorruptRecord`). When there is a parsing error, we will put the corrupt record in its unparsed format to the internal column. Users can skip/query this column through SQL. * To query those corrupt records ``` -- For Hive parser SELECT `_corrupt_record` FROM jsonTable WHERE `_corrupt_record` IS NOT NULL -- For our SQL parser SELECT _corrupt_record FROM jsonTable WHERE _corrupt_record IS NOT NULL ``` * To skip corrupt records and query regular records ``` -- For Hive parser SELECT field1, field2 FROM jsonTable WHERE `_corrupt_record` IS NULL -- For our SQL parser SELECT field1, field2 FROM jsonTable WHERE _corrupt_record IS NULL ``` Generally, it is not recommended to change the name of the internal column. If the name has to be changed to avoid possible name conflicts, you can use `sqlContext.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, <new column name>)` or `sqlContext.sql(SET spark.sql.columnNameOfCorruptRecord=<new column name>)`. Author: Yin Huai <[email protected]> Closes #2680 from yhuai/corruptJsonRecord and squashes the following commits: 4c9828e [Yin Huai] Merge remote-tracking branch 'upstream/master' into corruptJsonRecord 309616a [Yin Huai] Change the default name of corrupt record to "_corrupt_record". b4a3632 [Yin Huai] Merge remote-tracking branch 'upstream/master' into corruptJsonRecord 9375ae9 [Yin Huai] Set the column name of corrupt json record back to the default one after the unit test. ee584c0 [Yin Huai] Provide a way to query corrupt json records as unparsed strings. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1c7f0ab3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1c7f0ab3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1c7f0ab3 Branch: refs/heads/master Commit: 1c7f0ab302de9f82b1bd6da852d133823bc67c66 Parents: 1faa113 Author: Yin Huai <[email protected]> Authored: Thu Oct 9 14:57:27 2014 -0700 Committer: Michael Armbrust <[email protected]> Committed: Thu Oct 9 14:57:27 2014 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/sql/SQLConf.scala | 4 ++ .../scala/org/apache/spark/sql/SQLContext.scala | 14 +++-- .../spark/sql/api/java/JavaSQLContext.scala | 16 +++-- .../org/apache/spark/sql/json/JsonRDD.scala | 30 ++++++---- .../org/apache/spark/sql/json/JsonSuite.scala | 62 +++++++++++++++++++- .../apache/spark/sql/json/TestJsonData.scala | 9 +++ 6 files changed, 116 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/1c7f0ab3/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index f6f4cf3..07e6e2e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -35,6 +35,7 @@ private[spark] object SQLConf { val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString" val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata" val PARQUET_COMPRESSION = "spark.sql.parquet.compression.codec" + val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord" // This is only used for the thriftserver val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool" @@ -131,6 +132,9 @@ private[sql] trait SQLConf { private[spark] def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING, "false").toBoolean + private[spark] def columnNameOfCorruptRecord: String = + getConf(COLUMN_NAME_OF_CORRUPT_RECORD, "_corrupt_record") + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ http://git-wip-us.apache.org/repos/asf/spark/blob/1c7f0ab3/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 35561ca..014e1e2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -195,9 +195,12 @@ class SQLContext(@transient val sparkContext: SparkContext) */ @Experimental def jsonRDD(json: RDD[String], schema: StructType): SchemaRDD = { + val columnNameOfCorruptJsonRecord = columnNameOfCorruptRecord val appliedSchema = - Option(schema).getOrElse(JsonRDD.nullTypeToStringType(JsonRDD.inferSchema(json, 1.0))) - val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema) + Option(schema).getOrElse( + JsonRDD.nullTypeToStringType( + JsonRDD.inferSchema(json, 1.0, columnNameOfCorruptJsonRecord))) + val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema, columnNameOfCorruptJsonRecord) applySchema(rowRDD, appliedSchema) } @@ -206,8 +209,11 @@ class SQLContext(@transient val sparkContext: SparkContext) */ @Experimental def jsonRDD(json: RDD[String], samplingRatio: Double): SchemaRDD = { - val appliedSchema = JsonRDD.nullTypeToStringType(JsonRDD.inferSchema(json, samplingRatio)) - val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema) + val columnNameOfCorruptJsonRecord = columnNameOfCorruptRecord + val appliedSchema = + JsonRDD.nullTypeToStringType( + JsonRDD.inferSchema(json, samplingRatio, columnNameOfCorruptJsonRecord)) + val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema, columnNameOfCorruptJsonRecord) applySchema(rowRDD, appliedSchema) } http://git-wip-us.apache.org/repos/asf/spark/blob/1c7f0ab3/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala index c006c43..f8171c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala @@ -148,8 +148,12 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration { * It goes through the entire dataset once to determine the schema. */ def jsonRDD(json: JavaRDD[String]): JavaSchemaRDD = { - val appliedScalaSchema = JsonRDD.nullTypeToStringType(JsonRDD.inferSchema(json.rdd, 1.0)) - val scalaRowRDD = JsonRDD.jsonStringToRow(json.rdd, appliedScalaSchema) + val columnNameOfCorruptJsonRecord = sqlContext.columnNameOfCorruptRecord + val appliedScalaSchema = + JsonRDD.nullTypeToStringType( + JsonRDD.inferSchema(json.rdd, 1.0, columnNameOfCorruptJsonRecord)) + val scalaRowRDD = + JsonRDD.jsonStringToRow(json.rdd, appliedScalaSchema, columnNameOfCorruptJsonRecord) val logicalPlan = LogicalRDD(appliedScalaSchema.toAttributes, scalaRowRDD)(sqlContext) new JavaSchemaRDD(sqlContext, logicalPlan) @@ -162,10 +166,14 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration { */ @Experimental def jsonRDD(json: JavaRDD[String], schema: StructType): JavaSchemaRDD = { + val columnNameOfCorruptJsonRecord = sqlContext.columnNameOfCorruptRecord val appliedScalaSchema = Option(asScalaDataType(schema)).getOrElse( - JsonRDD.nullTypeToStringType(JsonRDD.inferSchema(json.rdd, 1.0))).asInstanceOf[SStructType] - val scalaRowRDD = JsonRDD.jsonStringToRow(json.rdd, appliedScalaSchema) + JsonRDD.nullTypeToStringType( + JsonRDD.inferSchema( + json.rdd, 1.0, columnNameOfCorruptJsonRecord))).asInstanceOf[SStructType] + val scalaRowRDD = JsonRDD.jsonStringToRow( + json.rdd, appliedScalaSchema, columnNameOfCorruptJsonRecord) val logicalPlan = LogicalRDD(appliedScalaSchema.toAttributes, scalaRowRDD)(sqlContext) new JavaSchemaRDD(sqlContext, logicalPlan) http://git-wip-us.apache.org/repos/asf/spark/blob/1c7f0ab3/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala index fbc2965..61ee960 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala @@ -22,6 +22,7 @@ import scala.collection.convert.Wrappers.{JMapWrapper, JListWrapper} import scala.math.BigDecimal import java.sql.Timestamp +import com.fasterxml.jackson.core.JsonProcessingException import com.fasterxml.jackson.databind.ObjectMapper import org.apache.spark.rdd.RDD @@ -35,16 +36,19 @@ private[sql] object JsonRDD extends Logging { private[sql] def jsonStringToRow( json: RDD[String], - schema: StructType): RDD[Row] = { - parseJson(json).map(parsed => asRow(parsed, schema)) + schema: StructType, + columnNameOfCorruptRecords: String): RDD[Row] = { + parseJson(json, columnNameOfCorruptRecords).map(parsed => asRow(parsed, schema)) } private[sql] def inferSchema( json: RDD[String], - samplingRatio: Double = 1.0): StructType = { + samplingRatio: Double = 1.0, + columnNameOfCorruptRecords: String): StructType = { require(samplingRatio > 0, s"samplingRatio ($samplingRatio) should be greater than 0") val schemaData = if (samplingRatio > 0.99) json else json.sample(false, samplingRatio, 1) - val allKeys = parseJson(schemaData).map(allKeysWithValueTypes).reduce(_ ++ _) + val allKeys = + parseJson(schemaData, columnNameOfCorruptRecords).map(allKeysWithValueTypes).reduce(_ ++ _) createSchema(allKeys) } @@ -274,7 +278,9 @@ private[sql] object JsonRDD extends Logging { case atom => atom } - private def parseJson(json: RDD[String]): RDD[Map[String, Any]] = { + private def parseJson( + json: RDD[String], + columnNameOfCorruptRecords: String): RDD[Map[String, Any]] = { // According to [Jackson-72: https://jira.codehaus.org/browse/JACKSON-72], // ObjectMapper will not return BigDecimal when // "DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS" is disabled @@ -289,12 +295,16 @@ private[sql] object JsonRDD extends Logging { // For example: for {"key": 1, "key":2}, we will get "key"->2. val mapper = new ObjectMapper() iter.flatMap { record => - val parsed = mapper.readValue(record, classOf[Object]) match { - case map: java.util.Map[_, _] => scalafy(map).asInstanceOf[Map[String, Any]] :: Nil - case list: java.util.List[_] => scalafy(list).asInstanceOf[Seq[Map[String, Any]]] - } + try { + val parsed = mapper.readValue(record, classOf[Object]) match { + case map: java.util.Map[_, _] => scalafy(map).asInstanceOf[Map[String, Any]] :: Nil + case list: java.util.List[_] => scalafy(list).asInstanceOf[Seq[Map[String, Any]]] + } - parsed + parsed + } catch { + case e: JsonProcessingException => Map(columnNameOfCorruptRecords -> record) :: Nil + } } }) } http://git-wip-us.apache.org/repos/asf/spark/blob/1c7f0ab3/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala index 3cfcb2b..7bb08f1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala @@ -21,6 +21,8 @@ import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.json.JsonRDD.{enforceCorrectType, compatibleType} import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.SQLConf +import org.apache.spark.sql.test.TestSQLContext import org.apache.spark.sql.test.TestSQLContext._ import java.sql.Timestamp @@ -644,7 +646,65 @@ class JsonSuite extends QueryTest { ("str_a_1", null, null) :: ("str_a_2", null, null) :: (null, "str_b_3", null) :: - ("str_a_4", "str_b_4", "str_c_4") ::Nil + ("str_a_4", "str_b_4", "str_c_4") :: Nil ) } + + test("Corrupt records") { + // Test if we can query corrupt records. + val oldColumnNameOfCorruptRecord = TestSQLContext.columnNameOfCorruptRecord + TestSQLContext.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, "_unparsed") + + val jsonSchemaRDD = jsonRDD(corruptRecords) + jsonSchemaRDD.registerTempTable("jsonTable") + + val schema = StructType( + StructField("_unparsed", StringType, true) :: + StructField("a", StringType, true) :: + StructField("b", StringType, true) :: + StructField("c", StringType, true) :: Nil) + + assert(schema === jsonSchemaRDD.schema) + + // In HiveContext, backticks should be used to access columns starting with a underscore. + checkAnswer( + sql( + """ + |SELECT a, b, c, _unparsed + |FROM jsonTable + """.stripMargin), + (null, null, null, "{") :: + (null, null, null, "") :: + (null, null, null, """{"a":1, b:2}""") :: + (null, null, null, """{"a":{, b:3}""") :: + ("str_a_4", "str_b_4", "str_c_4", null) :: + (null, null, null, "]") :: Nil + ) + + checkAnswer( + sql( + """ + |SELECT a, b, c + |FROM jsonTable + |WHERE _unparsed IS NULL + """.stripMargin), + ("str_a_4", "str_b_4", "str_c_4") :: Nil + ) + + checkAnswer( + sql( + """ + |SELECT _unparsed + |FROM jsonTable + |WHERE _unparsed IS NOT NULL + """.stripMargin), + Seq("{") :: + Seq("") :: + Seq("""{"a":1, b:2}""") :: + Seq("""{"a":{, b:3}""") :: + Seq("]") :: Nil + ) + + TestSQLContext.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, oldColumnNameOfCorruptRecord) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/1c7f0ab3/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala index fc833b8..eaca9f0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala @@ -143,4 +143,13 @@ object TestJsonData { """[{"a":"str_a_2"}, {"b":"str_b_3"}]""" :: """{"b":"str_b_4", "a":"str_a_4", "c":"str_c_4"}""" :: """[]""" :: Nil) + + val corruptRecords = + TestSQLContext.sparkContext.parallelize( + """{""" :: + """""" :: + """{"a":1, b:2}""" :: + """{"a":{, b:3}""" :: + """{"b":"str_b_4", "a":"str_a_4", "c":"str_c_4"}""" :: + """]""" :: Nil) } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
