[ 
https://issues.apache.org/jira/browse/SPARK-27593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847787#comment-16847787
 ] 

Ladislav Jech commented on SPARK-27593:
---------------------------------------

Ok, sorry, this was my misunderstanding of some options following code works 
fine:
{code:java}
def main():
 columns = ["col_1","col_2","col_3","col_4","error_record"]
 columns_struct_fields = [StructField(field_name, StringType(), True)
 for field_name in columns]
 customSchema = StructType(columns_struct_fields)

 spark = SparkSession \
 .builder \
 .appName("RandomForest") \
 .config("spark.executor.heartbeatInterval", "60s") \
 .getOrCreate()
 malformedRecords = []
 sc = spark.sparkContext
 sc.setLogLevel("INFO")
 sqlContext = SQLContext(sc)
 dataframe = sqlContext.read.csv(
 schema = customSchema,
 quote = "",
 header = False,
 inferSchema = False,
 sep = ",",
 mode = "PERMISSIVE",
 columnNameOfCorruptRecord = "error_record",
 multiLine = False,
 path = "/home/zangetsu/data.csv")

 total_row_count = dataframe.count()
 dataframe.printSchema()
 dataframe.show()
 print("total_row_count = " + str(total_row_count))

 errors = dataframe.filter(col("ErrorField").isNotNull())
 errors.show()
 error_count = errors.count()
 print("errors count = " + str(error_count))

 for obj in malformedRecords:
 print(obj){code}
{code:java}
+----------+-----+-----+-----+---------------+
| col_1|col_2|col_3|col_4| error_record|
+----------+-----+-----+-----+---------------+
| alfa| beta|gamma|delta| null|
| 1| 2| 3| 4| null|
| something| some| null| null|something, some|
|thing else| s| s| null|thing else, s,s|
+----------+-----+-----+-----+---------------+
total_row_count = 4{code}

> CSV Parser returns 2 DataFrame - Valid and Malformed DFs
> --------------------------------------------------------
>
>                 Key: SPARK-27593
>                 URL: https://issues.apache.org/jira/browse/SPARK-27593
>             Project: Spark
>          Issue Type: New Feature
>          Components: Spark Core
>    Affects Versions: 2.4.2
>            Reporter: Ladislav Jech
>            Priority: Major
>
> When we process CSV in any kind of data warehouse, its common procedure to 
> report corrupted records for audit purposes and feedback back to vendor, so 
> they can enhance their procedure. CSV is no difference from XSD from 
> perspective that it define a schema although in very limited way (in some 
> cases only as number of columns without even headers, and we don't have 
> types), but when I check XML document against XSD file, I get exact report of 
> if the file is completely valid and if not I get exact report of what records 
> are not following schema. 
> Such feature will have big value in Spark for CSV, get malformed records into 
> some dataframe, with line count (pointer within the data object), so I can 
> log both pointer and real data (line/row) and trigger action on this 
> unfortunate event.
> load() method could return Array of DFs (Valid, Invalid)
> PERMISSIVE MODE isn't enough as soon as it fill missing fields with nulls, so 
> it is even harder to detect what is really wrong. Another approach at moment 
> is to read both permissive and dropmalformed modes into 2 dataframes and 
> compare those one against each other.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to