[ 
https://issues.apache.org/jira/browse/SPARK-27593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847787#comment-16847787
 ] 

Ladislav Jech edited comment on SPARK-27593 at 5/24/19 6:15 PM:
----------------------------------------------------------------

Ok, sorry, this was my misunderstanding of some options following code works 
fine:
{code:java}
def main():
    columns = ["col_1","col_2","col_3","col_4","error_record"]
    columns_struct_fields = [StructField(field_name, StringType(), True)
                             for field_name in columns]
    customSchema = StructType(columns_struct_fields)

    spark = SparkSession \
        .builder \
        .appName("RandomForest") \
        .config("spark.executor.heartbeatInterval", "60s") \
        .getOrCreate()
    malformedRecords = []
    sc = spark.sparkContext
    sc.setLogLevel("INFO")
    sqlContext = SQLContext(sc)
    dataframe = sqlContext.read.csv(
        schema = customSchema,
        quote =  "",
        header = False,
        inferSchema = False,
        sep = ",",
        mode = "PERMISSIVE",
        columnNameOfCorruptRecord = "error_record",
        multiLine = False,
        path = ["/home/zangetsu/data.csv","/home/zangetsu/data2.csv"])
    dataframe = dataframe.withColumn("file_name", input_file_name())
    total_row_count = dataframe.count()
    dataframe.printSchema()
    dataframe.show()
    print("total_row_count = " + str(total_row_count))

    errors = dataframe.filter(col("error_record").isNotNull())
    errors.show()
    #error_count = errors.
    #print("errors count = " + str(error_count))

    for obj in malformedRecords:
        print(obj)



if __name__ == "__main__":
    main(){code}
{code:java}
+-----------+-----+-----+-----+----------------+--------------------+
| col_1|col_2|col_3|col_4| error_record| file_name|
+-----------+-----+-----+-----+----------------+--------------------+
| 1alfa| beta|gamma|delta| null|file:///home/zang...|
| 2| 2| 3| 4| null|file:///home/zang...|
| 3something| some| null| null|3something, some|file:///home/zang...|
|4thing else| s| s| null|4thing else, s,s|file:///home/zang...|
| prvni| beta|gamma|delta| null|file:///home/zang...|
| druha| 2| 3| 4| null|file:///home/zang...|
| treti| some| null| null| treti, some|file:///home/zang...|
| ctvrta| s| s| null| ctvrta, s,s|file:///home/zang...|
+-----------+-----+-----+-----+----------------+--------------------+{code}
Eventually it will be superior if I get line number, but this is small 
extension, thx for patience with me, This feature in Spark is enough for me to 
do the reporting.


was (Author: archenroot):
Ok, sorry, this was my misunderstanding of some options following code works 
fine:
{code:java}
def main():
 columns = ["col_1","col_2","col_3","col_4","error_record"]
 columns_struct_fields = [StructField(field_name, StringType(), True)
 for field_name in columns]
 customSchema = StructType(columns_struct_fields)

 spark = SparkSession \
 .builder \
 .appName("RandomForest") \
 .config("spark.executor.heartbeatInterval", "60s") \
 .getOrCreate()
 malformedRecords = []
 sc = spark.sparkContext
 sc.setLogLevel("INFO")
 sqlContext = SQLContext(sc)
 dataframe = sqlContext.read.csv(
 schema = customSchema,
 quote = "",
 header = False,
 inferSchema = False,
 sep = ",",
 mode = "PERMISSIVE",
 columnNameOfCorruptRecord = "error_record",
 multiLine = False,
 path = "/home/zangetsu/data.csv")

 total_row_count = dataframe.count()
 dataframe.printSchema()
 dataframe.show()
 print("total_row_count = " + str(total_row_count))

 errors = dataframe.filter(col("ErrorField").isNotNull())
 errors.show()
 error_count = errors.count()
 print("errors count = " + str(error_count))

 for obj in malformedRecords:
 print(obj){code}
{code:java}
+----------+-----+-----+-----+---------------+
| col_1|col_2|col_3|col_4| error_record|
+----------+-----+-----+-----+---------------+
| alfa| beta|gamma|delta| null|
| 1| 2| 3| 4| null|
| something| some| null| null|something, some|
|thing else| s| s| null|thing else, s,s|
+----------+-----+-----+-----+---------------+
total_row_count = 4{code}

> CSV Parser returns 2 DataFrame - Valid and Malformed DFs
> --------------------------------------------------------
>
>                 Key: SPARK-27593
>                 URL: https://issues.apache.org/jira/browse/SPARK-27593
>             Project: Spark
>          Issue Type: New Feature
>          Components: Spark Core
>    Affects Versions: 2.4.2
>            Reporter: Ladislav Jech
>            Priority: Major
>
> When we process CSV in any kind of data warehouse, its common procedure to 
> report corrupted records for audit purposes and feedback back to vendor, so 
> they can enhance their procedure. CSV is no difference from XSD from 
> perspective that it define a schema although in very limited way (in some 
> cases only as number of columns without even headers, and we don't have 
> types), but when I check XML document against XSD file, I get exact report of 
> if the file is completely valid and if not I get exact report of what records 
> are not following schema. 
> Such feature will have big value in Spark for CSV, get malformed records into 
> some dataframe, with line count (pointer within the data object), so I can 
> log both pointer and real data (line/row) and trigger action on this 
> unfortunate event.
> load() method could return Array of DFs (Valid, Invalid)
> PERMISSIVE MODE isn't enough as soon as it fill missing fields with nulls, so 
> it is even harder to detect what is really wrong. Another approach at moment 
> is to read both permissive and dropmalformed modes into 2 dataframes and 
> compare those one against each other.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to