[ 
https://issues.apache.org/jira/browse/SPARK-36983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mike updated SPARK-36983:
-------------------------
    Attachment:     (was: file2.parquet)

> ignoreCorruptFiles does not work when schema change from int to string when a 
> file having more than X records
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-36983
>                 URL: https://issues.apache.org/jira/browse/SPARK-36983
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.4.8, 3.1.2
>            Reporter: mike
>            Priority: Major
>
> Precondition:
> Spark 3.1 run locally on my Macbook Pro(16G Ram,i7, 2015)
> In folder A having two parquet files
>  * File 1: have some columns and one of them is column X with data type Int 
> and have only one record
>  * File 2: Same schema with File 1 except column X  having data type String 
> and having>= X records
> X depends on the capacity of your computer, my case is 36
> Read file 1 to get schema of file 1.
> Read folder A with schema of file 1.
> Expected: Read successfully, file 2 will be ignored as the data type of 
> column X changed to string.
> Actual: File 2 seems to be not ignored and get error:
>  `WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2) (192.168.1.78 
> executor driver): java.lang.UnsupportedOperationException: 
> org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary
>  WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2) (192.168.1.78 
> executor driver): java.lang.UnsupportedOperationException: 
> org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary
>  at org.apache.parquet.column.Dictionary.decodeToInt(Dictionary.java:45)`
>  
> If i remove one record from file2. It works well  
>  
>  Code with exist file
> {code:java}
> spark.conf.set('spark.sql.files.ignoreCorruptFiles', True)
> folder_path = 's3://xxx-data-dev/test/ignore_corrupt_files' 
> file1_path = f'{folder_path}/file1.parquet' 
> file1_schema = spark.read.parquet(file1_path).schema 
> file_all_df = spark.read.schema(file1_schema).parquet( folder_path) 
> file_all_df.show(n=100000)
> {code}
> Code with creating file
>  
> {code:java}
> spark.conf.set('spark.sql.files.ignoreCorruptFiles', True)
> schema1 = StructType([
>  StructField("program_sk", IntegerType(), True),
>  StructField("client_sk", IntegerType(), True),
> ])
> sample_data = [(1, 17)]
> df1 = spark.createDataFrame(sample_data, schema1)
> schema2 = StructType([
>  StructField("program_sk", IntegerType(), True),
>  StructField("client_sk", StringType(), True),
> ])
> sample_data = [(1, "19999"), (2, "3332"), (3, "199999"), (4, "3333"),
>  (2, "19999"), (2, "3332"), (3, "199999"), (4, "3333"),
>  (3, "19999"), (2, "3332"), (3, "199999"), (4, "3333"),
>  (4, "19999"), (2, "3332"), (3, "199999"), (4, "3333"),
>  (5, "19999"), (2, "3332"), (3, "199999"), (4, "3333"),
>  (6, "19999"), (2, "3332"), (3, "199999"), (4, "3333"),
>  (7, "19999"), (2, "3332"), (3, "199999"), (4, "3333"),
>  (8, "19999"), (2, "3332"), (3, "199999"), (4, "3333"),
>  (9, "19999"), (2, "3332"), (3, "199999"), (4, "3333"),
>  ]
> df2 = spark.createDataFrame(sample_data, schema2)
> file_save_path = 's3://xxx-data-dev/adp_data_lake/test_ignore_corrupt/'
> df1.write \
>  .mode('overwrite') \
>  .format('parquet') \
>  .save(f'{file_save_path}')
> df2.write \
>  .mode('append') \
>  .format('parquet') \
>  .save(f'{file_save_path}')
> df = spark.read.schema(schema1).parquet(file_save_path)
> df.show(){code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to