[ 
https://issues.apache.org/jira/browse/SPARK-28079?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16868327#comment-16868327
 ] 

F Jimenez commented on SPARK-28079:
-----------------------------------

Hi, sorry for the late response

I'll paste the relevant part in the documentation here for convenience (from 
https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/DataFrameReader.html#csv-scala.collection.Seq-)
{quote}{{PERMISSIVE}} : when it meets a corrupted record, puts the malformed 
string into a field configured by {{columnNameOfCorruptRecord}}, and sets other 
fields to {{null}}. To keep corrupt records, an user can set a string type 
field named {{columnNameOfCorruptRecord}} in an user-defined schema. If a 
schema does not have the field, it drops corrupt records during parsing. A 
record with less/more tokens than schema is not a corrupted record to CSV. When 
it meets a record having fewer tokens than the length of the schema, sets 
{{null}} to extra fields. When the record has more tokens than the length of 
the schema, it drops extra tokens.
{quote}
Note that in the example above, no use-defined schema is supplied. Instead the 
csv header is being used to set column names, given `.option("header", "true")`.

It's not clear what the behaviour should be given the above documentation 
snippet, but intuitively I would expect that if you use `PERMISSIVE` and don't 
supply a schema, the corrupt record column would be added to the generated 
schema (in this case from the header).

As far as I can see, there is no way to put corrupt records in the 
corresponding field _unless_ you supply a schema that includes the corrupt 
records column. If you want to read the column names from the header of the 
CSV, that would mean I would have to read the header from the CSV myself and 
create the schema before reading the CSV with Spark.

Also, with the current implementation, given the example above, data is being 
silently lost (the "d*" value). Given the `PERMISSIVE` mode, this looks a bit 
dangerous. If you choose `DROPMALFORMED` then it's OK, you are explicitly 
telling Spark to drop bad data, but in this case I'd expect it to be reported 
somehow (in the corrupt record column ;) )

Looks like the issue in SPARK-28058 covers a different case, doesn't it? 
related though

 

> CSV fails to detect corrupt record unless "columnNameOfCorruptRecord" is 
> manually added to the schema
> -----------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-28079
>                 URL: https://issues.apache.org/jira/browse/SPARK-28079
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.3.2, 2.4.3
>            Reporter: F Jimenez
>            Priority: Major
>
> When reading a CSV with mode = "PERMISSIVE", corrupt records are not flagged 
> as such and read in. Only way to get them flagged is to manually set 
> "columnNameOfCorruptRecord" AND manually setting the schema including this 
> column. Example:
> {code:java}
> // Second row has a 4th column that is not declared in the header/schema
> val csvText = s"""
>                  | FieldA, FieldB, FieldC
>                  | a1,b1,c1
>                  | a2,b2,c2,d*""".stripMargin
> val csvFile = new File("/tmp/file.csv")
> FileUtils.write(csvFile, csvText)
> val reader = sqlContext.read
>   .format("csv")
>   .option("header", "true")
>   .option("mode", "PERMISSIVE")
>   .option("columnNameOfCorruptRecord", "corrupt")
>   .schema("corrupt STRING, fieldA STRING, fieldB STRING, fieldC STRING")
> reader.load(csvFile.getAbsolutePath).show(truncate = false)
> {code}
> This produces the correct result:
> {code:java}
> +------------+------+------+------+
> |corrupt     |fieldA|fieldB|fieldC|
> +------------+------+------+------+
> |null        | a1   |b1    |c1    |
> | a2,b2,c2,d*| a2   |b2    |c2    |
> +------------+------+------+------+
> {code}
> However removing the "schema" option and going:
> {code:java}
> val reader = sqlContext.read
>   .format("csv")
>   .option("header", "true")
>   .option("mode", "PERMISSIVE")
>   .option("columnNameOfCorruptRecord", "corrupt")
> reader.load(csvFile.getAbsolutePath).show(truncate = false)
> {code}
> Yields:
> {code:java}
> +-------+-------+-------+
> | FieldA| FieldB| FieldC|
> +-------+-------+-------+
> | a1    |b1     |c1     |
> | a2    |b2     |c2     |
> +-------+-------+-------+
> {code}
> The fourth value "d*" in the second row has been removed and the row not 
> marked as corrupt
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to