[ 
https://issues.apache.org/jira/browse/SPARK-38402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17502532#comment-17502532
 ] 

Dilip Biswal commented on SPARK-38402:
--------------------------------------

[~hyukjin.kwon] Thanks !!
Yeah, that should work. The only thing is, this puts an extra burden on the 
application to be aware of the context (i.e accessing the error data frame) and 
do this additional branching. We were wondering if this can be done implicitly 
by the runtime. After all, we are simply trying to do an operation on a data 
frame that is returned to us by spark.

> Improve user experience when working on data frames created from CSV and JSON 
> in PERMISSIVE mode.
> -------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-38402
>                 URL: https://issues.apache.org/jira/browse/SPARK-38402
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.2.1
>            Reporter: Dilip Biswal
>            Priority: Major
>
> In our data processing pipeline, we first process the user supplied data and 
> eliminate invalid/corrupt records. So we parse JSON and CSV files in 
> PERMISSIVE mode where all the invalid records are captured in 
> "_corrupt_record". We then apply predicates on "_corrupt_record" to eliminate 
> the bad records before subjecting the good records further in the processing 
> pipeline.
> We encountered two issues.
> 1. The introduction of "predicate pushdown" for CSV, does not take into 
> account this system generated "_corrupt_column" and tries to push this down 
> to scan resulting in an exception as the column is not part of base schema. 
> 2. Applying predicates on "_corrupt_column" results in a AnalysisException 
> like following.
> {code:java}
> val schema = new StructType()
>   .add("id",IntegerType,true)
>   .add("weight",IntegerType,true) // The weight field is defined wrongly. The 
> actual data contains floating point numbers, while the schema specifies an 
> integer.
>   .add("price",IntegerType,true)
>   .add("_corrupt_record", StringType, true) // The schema contains a special 
> column _corrupt_record, which does not exist in the data. This column 
> captures rows that did not parse correctly.
> val csv_with_wrong_schema = spark.read.format("csv")
>   .option("header", "true")
>   .schema(schema)
>   .load("/FileStore/tables/csv_corrupt_record.csv")
> val badRows = csv_with_wrong_schema.filter($"_corrupt_record".isNotNull)
> 7
> val numBadRows = badRows.count()
>  Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the
> referenced columns only include the internal corrupt record column
> (named _corrupt_record by default). For example:
> spark.read.schema(schema).csv(file).filter($"_corrupt_record".isNotNull).count()
> and spark.read.schema(schema).csv(file).select("_corrupt_record").show().
> Instead, you can cache or save the parsed results and then send the same 
> query.
> For example, val df = spark.read.schema(schema).csv(file).cache() and then
> df.filter($"_corrupt_record".isNotNull).count().
> {code:java}
> For (1), we have disabled predicate pushdown.
> For (2), we currently cache the data frame before using it , however, its not 
> convenient and we would like to see a better user experience.  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to