[ https://issues.apache.org/jira/browse/SPARK-27593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847734#comment-16847734 ]
Ladislav Jech commented on SPARK-27593: --------------------------------------- I cannot get this working, I have following code: {code:java} def main(): customSchema = StructType([ StructField("project", StringType(), True), StructField("article", StringType(), True), StructField("requests", IntegerType(), True), StructField("bytes_served", DoubleType(), True) ]) spark = SparkSession \ .builder \ .appName("RandomForest") \ .config("spark.executor.heartbeatInterval", "60s") \ .getOrCreate() malformedRecords = [] sc = spark.sparkContext sc.setLogLevel("INFO") sqlContext = SQLContext(sc) dataframe = sqlContext.read.format("csv") \ .option("schema", customSchema)\ .option("quote", "")\ .option("inferSchema", False)\ .option("sep", ",")\ .option("mode", "PERMISSIVE")\ .option("columnNameOfCorruptRecord","ErrorField")\ .option("multiLine", False)\ .option("path","/home/zangetsu/data.csv")\ .load() total_row_count = dataframe.count() dataframe.printSchema() dataframe.show() print("total_row_count = " + str(total_row_count)) errors = dataframe.filter(col("ErrorField").isNotNull()) errors.show(){code} And following CSV content: {code:java} alfa,beta,gamma,delta 1,2,3,4 something, some thing else, s,s{code} So first 2 rows are ok as per schema, 2 others are not. First issue I noticed is that the schema is not reflected on DF: {code:java} +----------+-----+-----+-----+ | _c0| _c1| _c2| _c3| +----------+-----+-----+-----+ | alfa| beta|gamma|delta| | 1| 2| 3| 4| | something| some| null| null| |thing else| s| s| null| +----------+-----+-----+-----+{code} So the data are loaded all, and also column names are not those provided by custom schema. it also means the ErrorField column is not available. > CSV Parser returns 2 DataFrame - Valid and Malformed DFs > -------------------------------------------------------- > > Key: SPARK-27593 > URL: https://issues.apache.org/jira/browse/SPARK-27593 > Project: Spark > Issue Type: New Feature > Components: Spark Core > Affects Versions: 2.4.2 > Reporter: Ladislav Jech > Priority: Major > > When we process CSV in any kind of data warehouse, its common procedure to > report corrupted records for audit purposes and feedback back to vendor, so > they can enhance their procedure. CSV is no difference from XSD from > perspective that it define a schema although in very limited way (in some > cases only as number of columns without even headers, and we don't have > types), but when I check XML document against XSD file, I get exact report of > if the file is completely valid and if not I get exact report of what records > are not following schema. > Such feature will have big value in Spark for CSV, get malformed records into > some dataframe, with line count (pointer within the data object), so I can > log both pointer and real data (line/row) and trigger action on this > unfortunate event. > load() method could return Array of DFs (Valid, Invalid) > PERMISSIVE MODE isn't enough as soon as it fill missing fields with nulls, so > it is even harder to detect what is really wrong. Another approach at moment > is to read both permissive and dropmalformed modes into 2 dataframes and > compare those one against each other. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org