[ https://issues.apache.org/jira/browse/SPARK-27593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847787#comment-16847787 ]
Ladislav Jech edited comment on SPARK-27593 at 5/24/19 6:15 PM: ---------------------------------------------------------------- Ok, sorry, this was my misunderstanding of some options following code works fine: {code:java} def main(): columns = ["col_1","col_2","col_3","col_4","error_record"] columns_struct_fields = [StructField(field_name, StringType(), True) for field_name in columns] customSchema = StructType(columns_struct_fields) spark = SparkSession \ .builder \ .appName("RandomForest") \ .config("spark.executor.heartbeatInterval", "60s") \ .getOrCreate() malformedRecords = [] sc = spark.sparkContext sc.setLogLevel("INFO") sqlContext = SQLContext(sc) dataframe = sqlContext.read.csv( schema = customSchema, quote = "", header = False, inferSchema = False, sep = ",", mode = "PERMISSIVE", columnNameOfCorruptRecord = "error_record", multiLine = False, path = ["/home/zangetsu/data.csv","/home/zangetsu/data2.csv"]) dataframe = dataframe.withColumn("file_name", input_file_name()) total_row_count = dataframe.count() dataframe.printSchema() dataframe.show() print("total_row_count = " + str(total_row_count)) errors = dataframe.filter(col("error_record").isNotNull()) errors.show() #error_count = errors. #print("errors count = " + str(error_count)) for obj in malformedRecords: print(obj) if __name__ == "__main__": main(){code} {code:java} +-----------+-----+-----+-----+----------------+--------------------+ | col_1|col_2|col_3|col_4| error_record| file_name| +-----------+-----+-----+-----+----------------+--------------------+ | 1alfa| beta|gamma|delta| null|file:///home/zang...| | 2| 2| 3| 4| null|file:///home/zang...| | 3something| some| null| null|3something, some|file:///home/zang...| |4thing else| s| s| null|4thing else, s,s|file:///home/zang...| | prvni| beta|gamma|delta| null|file:///home/zang...| | druha| 2| 3| 4| null|file:///home/zang...| | treti| some| null| null| treti, some|file:///home/zang...| | ctvrta| s| s| null| ctvrta, s,s|file:///home/zang...| +-----------+-----+-----+-----+----------------+--------------------+{code} Eventually it will be superior if I get line number, but this is small extension, thx for patience with me, This feature in Spark is enough for me to do the reporting. was (Author: archenroot): Ok, sorry, this was my misunderstanding of some options following code works fine: {code:java} def main(): columns = ["col_1","col_2","col_3","col_4","error_record"] columns_struct_fields = [StructField(field_name, StringType(), True) for field_name in columns] customSchema = StructType(columns_struct_fields) spark = SparkSession \ .builder \ .appName("RandomForest") \ .config("spark.executor.heartbeatInterval", "60s") \ .getOrCreate() malformedRecords = [] sc = spark.sparkContext sc.setLogLevel("INFO") sqlContext = SQLContext(sc) dataframe = sqlContext.read.csv( schema = customSchema, quote = "", header = False, inferSchema = False, sep = ",", mode = "PERMISSIVE", columnNameOfCorruptRecord = "error_record", multiLine = False, path = "/home/zangetsu/data.csv") total_row_count = dataframe.count() dataframe.printSchema() dataframe.show() print("total_row_count = " + str(total_row_count)) errors = dataframe.filter(col("ErrorField").isNotNull()) errors.show() error_count = errors.count() print("errors count = " + str(error_count)) for obj in malformedRecords: print(obj){code} {code:java} +----------+-----+-----+-----+---------------+ | col_1|col_2|col_3|col_4| error_record| +----------+-----+-----+-----+---------------+ | alfa| beta|gamma|delta| null| | 1| 2| 3| 4| null| | something| some| null| null|something, some| |thing else| s| s| null|thing else, s,s| +----------+-----+-----+-----+---------------+ total_row_count = 4{code} > CSV Parser returns 2 DataFrame - Valid and Malformed DFs > -------------------------------------------------------- > > Key: SPARK-27593 > URL: https://issues.apache.org/jira/browse/SPARK-27593 > Project: Spark > Issue Type: New Feature > Components: Spark Core > Affects Versions: 2.4.2 > Reporter: Ladislav Jech > Priority: Major > > When we process CSV in any kind of data warehouse, its common procedure to > report corrupted records for audit purposes and feedback back to vendor, so > they can enhance their procedure. CSV is no difference from XSD from > perspective that it define a schema although in very limited way (in some > cases only as number of columns without even headers, and we don't have > types), but when I check XML document against XSD file, I get exact report of > if the file is completely valid and if not I get exact report of what records > are not following schema. > Such feature will have big value in Spark for CSV, get malformed records into > some dataframe, with line count (pointer within the data object), so I can > log both pointer and real data (line/row) and trigger action on this > unfortunate event. > load() method could return Array of DFs (Valid, Invalid) > PERMISSIVE MODE isn't enough as soon as it fill missing fields with nulls, so > it is even harder to detect what is really wrong. Another approach at moment > is to read both permissive and dropmalformed modes into 2 dataframes and > compare those one against each other. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org