[ 
https://issues.apache.org/jira/browse/SPARK-22248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16200074#comment-16200074
 ] 

Gaurav Shah commented on SPARK-22248:
-------------------------------------

We can work on a patch request unless there wan an explicit reason not to parse 
it.  `PERMISSIVE` mode definition says `sets other fields to null when it meets 
a corrupted record, and puts the malformed string into a new field configured 
by columnNameOfCorruptRecord.`  So I guess its meant to parse other fields 
atleast.

> spark marks all columns as null when its unable to parse one column
> -------------------------------------------------------------------
>
>                 Key: SPARK-22248
>                 URL: https://issues.apache.org/jira/browse/SPARK-22248
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.1.2, 2.2.0
>            Reporter: Vignesh Mohan
>
> when parsing JSON data in `PERMISSIVE` mode if one column mismatches the 
> schema it attributes all column values as null.
> {code}
> val conf = new SparkConf().setMaster("local").setAppName("app")
>     val sc = new SparkContext(conf)
>     val sqlContext = new SQLContext(sc)
>     val sparkschema : StructType = {
>       StructType(StructField("name", StringType) :: StructField("count", 
> LongType) :: Nil)
>     }
>       val rdd = sc.parallelize(List(
>         """
>           |{"name": "foo", "count": 24.0}}
>           |""".stripMargin,
>         """
>           |{"name": "bar", "count": 24}}
>           |""".stripMargin
>       ))
>       
> sqlContext.read.schema(sparkschema).json(rdd).createOrReplaceTempView("events")
>       sqlContext.sql(
>         """
>           | select
>           | name,count
>           | from
>           | events
>         """.stripMargin).collect.foreach(println)
> {code}
> Output:
> {code}
> 17/10/11 03:12:04 WARN JacksonParser: Found at least one malformed records 
> (sample: 
> {"name": "foo", "count": 24.0}}
> ). The JSON reader will replace
> all malformed records with placeholder null in current PERMISSIVE parser mode.
> To find out which corrupted records have been replaced with null, please use 
> the
> default inferred schema instead of providing a custom schema.
> Code example to print all malformed records (scala):
> ===================================================
> // The corrupted record exists in column _corrupt_record.
> val parsedJson = spark.read.json("/path/to/json/file/test.json")
>            
> [null,null]
> [bar,24]
> {code}
> Expected output:
> {code}
> [foo,null]
> [bar,24]
> {code}
> The problem comes from 
> `spark-catalyst_2.11-2.1.0-sources.jar!/org/apache/spark/sql/catalyst/json/JacksonParser.scala`
> {code}
> private def failedConversion(
>       parser: JsonParser,
>       dataType: DataType): PartialFunction[JsonToken, Any] = {
>     case VALUE_STRING if parser.getTextLength < 1 =>
>       // If conversion is failed, this produces `null` rather than throwing 
> exception.
>       // This will protect the mismatch of types.
>       null
>     case token =>
>       // We cannot parse this token based on the given data type. So, we 
> throw a
>       // SparkSQLJsonProcessingException and this exception will be caught by
>       // `parse` method.
>       throw new SparkSQLJsonProcessingException(
>         s"Failed to parse a value for data type $dataType (current token: 
> $token).")
>   }
> {code}
> this raises an exception when parsing the column and 
> {code}
> def parse(input: String): Seq[InternalRow] = {
>     if (input.trim.isEmpty) {
>       Nil
>     } else {
>       try {
>         Utils.tryWithResource(factory.createParser(input)) { parser =>
>           parser.nextToken()
>           rootConverter.apply(parser) match {
>             case null => failedRecord(input)
>             case row: InternalRow => row :: Nil
>             case array: ArrayData =>
>               // Here, as we support reading top level JSON arrays and take 
> every element
>               // in such an array as a row, this case is possible.
>               if (array.numElements() == 0) {
>                 Nil
>               } else {
>                 array.toArray[InternalRow](schema)
>               }
>             case _ =>
>               failedRecord(input)
>           }
>         }
>       } catch {
>         case _: JsonProcessingException =>
>           failedRecord(input)
>         case _: SparkSQLJsonProcessingException =>
>           failedRecord(input)
>       }
>     }
>   }
> {code}
> marks the whole record as failedRecord. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to