Dilip Biswal created SPARK-38402:
------------------------------------

             Summary: Improve user experience when working on data frames 
created from CSV and JSON in PERMISSIVE mode.
                 Key: SPARK-38402
                 URL: https://issues.apache.org/jira/browse/SPARK-38402
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 3.2.1
            Reporter: Dilip Biswal


In our data processing pipeline, we first process the user supplied data and 
eliminate invalid/corrupt records. So we parse JSON and CSV files in PERMISSIVE 
mode where all the invalid records are captured in "_corrupt_record". We then 
apply predicates on "_corrupt_record" to eliminate the bad records before 
subjecting the good records further in the processing pipeline.

We encountered two issues.
1. The introduction of "predicate pushdown" for CSV, does not take into account 
this system generated "_corrupt_column" and tries to push this down to scan 
resulting in an exception as the column is not part of base schema. 
2. Applying predicates on "_corrupt_column" results in a AnalysisException like 
following.
{code:java}
val schema = new StructType()
  .add("id",IntegerType,true)
  .add("weight",IntegerType,true) // The weight field is defined wrongly. The 
actual data contains floating point numbers, while the schema specifies an 
integer.
  .add("price",IntegerType,true)
  .add("_corrupt_record", StringType, true) // The schema contains a special 
column _corrupt_record, which does not exist in the data. This column captures 
rows that did not parse correctly.

val csv_with_wrong_schema = spark.read.format("csv")
  .option("header", "true")
  .schema(schema)
  .load("/FileStore/tables/csv_corrupt_record.csv")

val badRows = csv_with_wrong_schema.filter($"_corrupt_record".isNotNull)
7
val numBadRows = badRows.count()
 Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the
referenced columns only include the internal corrupt record column
(named _corrupt_record by default). For example:
spark.read.schema(schema).csv(file).filter($"_corrupt_record".isNotNull).count()
and spark.read.schema(schema).csv(file).select("_corrupt_record").show().
Instead, you can cache or save the parsed results and then send the same query.
For example, val df = spark.read.schema(schema).csv(file).cache() and then
df.filter($"_corrupt_record".isNotNull).count().

{code:java}

For (1), we have disabled predicate pushdown.
For (2), we currently cache the data frame before using it , however, its not 
convenient and we would like to see a better user experience.  
 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to