Dilip Biswal created SPARK-38402: ------------------------------------ Summary: Improve user experience when working on data frames created from CSV and JSON in PERMISSIVE mode. Key: SPARK-38402 URL: https://issues.apache.org/jira/browse/SPARK-38402 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.2.1 Reporter: Dilip Biswal
In our data processing pipeline, we first process the user supplied data and eliminate invalid/corrupt records. So we parse JSON and CSV files in PERMISSIVE mode where all the invalid records are captured in "_corrupt_record". We then apply predicates on "_corrupt_record" to eliminate the bad records before subjecting the good records further in the processing pipeline. We encountered two issues. 1. The introduction of "predicate pushdown" for CSV, does not take into account this system generated "_corrupt_column" and tries to push this down to scan resulting in an exception as the column is not part of base schema. 2. Applying predicates on "_corrupt_column" results in a AnalysisException like following. {code:java} val schema = new StructType() .add("id",IntegerType,true) .add("weight",IntegerType,true) // The weight field is defined wrongly. The actual data contains floating point numbers, while the schema specifies an integer. .add("price",IntegerType,true) .add("_corrupt_record", StringType, true) // The schema contains a special column _corrupt_record, which does not exist in the data. This column captures rows that did not parse correctly. val csv_with_wrong_schema = spark.read.format("csv") .option("header", "true") .schema(schema) .load("/FileStore/tables/csv_corrupt_record.csv") val badRows = csv_with_wrong_schema.filter($"_corrupt_record".isNotNull) 7 val numBadRows = badRows.count() Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column (named _corrupt_record by default). For example: spark.read.schema(schema).csv(file).filter($"_corrupt_record".isNotNull).count() and spark.read.schema(schema).csv(file).select("_corrupt_record").show(). Instead, you can cache or save the parsed results and then send the same query. For example, val df = spark.read.schema(schema).csv(file).cache() and then df.filter($"_corrupt_record".isNotNull).count(). {code:java} For (1), we have disabled predicate pushdown. For (2), we currently cache the data frame before using it , however, its not convenient and we would like to see a better user experience. -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org