[ https://issues.apache.org/jira/browse/SPARK-36983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
mike updated SPARK-36983: ------------------------- Description: Precondition: Spark 3.1 run locally on my Macbook Pro(16G Ram,i7, 2015) In folder A having two parquet files * File 1: have some columns and one of them is column X with data type Int and have only one record * File 2: Same schema with File 1 except column X having data type String and having>= X records X depends on the capacity of your computer, my case is 36 Read file 1 to get schema of file 1. Read folder A with schema of file 1. Expected: Read successfully, file 2 will be ignored as the data type of column X changed to string. Actual: File 2 seems to be not ignored and get error: `WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2) (192.168.1.78 executor driver): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2) (192.168.1.78 executor driver): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary at org.apache.parquet.column.Dictionary.decodeToInt(Dictionary.java:45)` If i remove one record from file2. It works well Code with exist file {code:java} spark.conf.set('spark.sql.files.ignoreCorruptFiles', True) folder_path = 's3://xxx-data-dev/test/ignore_corrupt_files' file1_path = f'{folder_path}/file1.parquet' file1_schema = spark.read.parquet(file1_path).schema file_all_df = spark.read.schema(file1_schema).parquet( folder_path) file_all_df.show(n=100000) {code} Code with creating file {code:java} spark.conf.set('spark.sql.files.ignoreCorruptFiles', True) schema1 = StructType([ StructField("program_sk", IntegerType(), True), StructField("client_sk", IntegerType(), True), ]) sample_data = [(1, 17)] df1 = spark.createDataFrame(sample_data, schema1) schema2 = StructType([ StructField("program_sk", IntegerType(), True), StructField("client_sk", StringType(), True), ]) sample_data = [(1, "19999"), (2, "3332"), (3, "199999"), (4, "3333"), (2, "19999"), (2, "3332"), (3, "199999"), (4, "3333"), (3, "19999"), (2, "3332"), (3, "199999"), (4, "3333"), (4, "19999"), (2, "3332"), (3, "199999"), (4, "3333"), (5, "19999"), (2, "3332"), (3, "199999"), (4, "3333"), (6, "19999"), (2, "3332"), (3, "199999"), (4, "3333"), (7, "19999"), (2, "3332"), (3, "199999"), (4, "3333"), (8, "19999"), (2, "3332"), (3, "199999"), (4, "3333"), (9, "19999"), (2, "3332"), (3, "199999"), (4, "3333"), ] df2 = spark.createDataFrame(sample_data, schema2) file_save_path = 's3://xxx-data-dev/adp_data_lake/test_ignore_corrupt/' df1.write \ .mode('overwrite') \ .format('parquet') \ .save(f'{file_save_path}') df2.write \ .mode('append') \ .format('parquet') \ .save(f'{file_save_path}') df = spark.read.schema(schema1).parquet(file_save_path) df.show(){code} was: Precondition: Spark 3.1 run locally on my Macbook Pro(16G Ram,i7, 2015) In folder A having two parquet files * File 1: have some columns and one of them is column X with data type Int and have only one record * File 2: Same schema with File 1 except column X having data type String and having>= 36 records Read file 1 to get schema of file 1. Read folder A with schema of file 1. Expected: Read successfully, file 2 will be ignored as the data type of column X changed to string. Actual: File 2 seems to be not ignored and get error: `WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2) (192.168.1.78 executor driver): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2) (192.168.1.78 executor driver): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary at org.apache.parquet.column.Dictionary.decodeToInt(Dictionary.java:45)` If i remove one record from file2. It works well Code with exist file {code:java} spark.conf.set('spark.sql.files.ignoreCorruptFiles', True) folder_path = 's3://xxx-data-dev/test/ignore_corrupt_files' file1_path = f'{folder_path}/file1.parquet' file1_schema = spark.read.parquet(file1_path).schema file_all_df = spark.read.schema(file1_schema).parquet( folder_path) file_all_df.show(n=100000) {code} Code with creating file {code:java} spark.conf.set('spark.sql.files.ignoreCorruptFiles', True) schema1 = StructType([ StructField("program_sk", IntegerType(), True), StructField("client_sk", IntegerType(), True), ]) sample_data = [(1, 17)] df1 = spark.createDataFrame(sample_data, schema1) schema2 = StructType([ StructField("program_sk", IntegerType(), True), StructField("client_sk", StringType(), True), ]) sample_data = [(1, "19999"), (2, "3332"), (3, "199999"), (4, "3333"), (2, "19999"), (2, "3332"), (3, "199999"), (4, "3333"), (3, "19999"), (2, "3332"), (3, "199999"), (4, "3333"), (4, "19999"), (2, "3332"), (3, "199999"), (4, "3333"), (5, "19999"), (2, "3332"), (3, "199999"), (4, "3333"), (6, "19999"), (2, "3332"), (3, "199999"), (4, "3333"), (7, "19999"), (2, "3332"), (3, "199999"), (4, "3333"), (8, "19999"), (2, "3332"), (3, "199999"), (4, "3333"), (9, "19999"), (2, "3332"), (3, "199999"), (4, "3333"), ] df2 = spark.createDataFrame(sample_data, schema2) file_save_path = 's3://xxx-data-dev/adp_data_lake/test_ignore_corrupt/' df1.write \ .mode('overwrite') \ .format('parquet') \ .save(f'{file_save_path}') df2.write \ .mode('append') \ .format('parquet') \ .save(f'{file_save_path}') df = spark.read.schema(schema1).parquet(file_save_path) df.show(){code} > ignoreCorruptFiles does not work when schema change from int to string when a > file having more than X records > ------------------------------------------------------------------------------------------------------------- > > Key: SPARK-36983 > URL: https://issues.apache.org/jira/browse/SPARK-36983 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.4.8, 3.1.2 > Reporter: mike > Priority: Major > Attachments: file1.parquet, file2.parquet > > > Precondition: > Spark 3.1 run locally on my Macbook Pro(16G Ram,i7, 2015) > In folder A having two parquet files > * File 1: have some columns and one of them is column X with data type Int > and have only one record > * File 2: Same schema with File 1 except column X having data type String > and having>= X records > X depends on the capacity of your computer, my case is 36 > Read file 1 to get schema of file 1. > Read folder A with schema of file 1. > Expected: Read successfully, file 2 will be ignored as the data type of > column X changed to string. > Actual: File 2 seems to be not ignored and get error: > `WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2) (192.168.1.78 > executor driver): java.lang.UnsupportedOperationException: > org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary > WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2) (192.168.1.78 > executor driver): java.lang.UnsupportedOperationException: > org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary > at org.apache.parquet.column.Dictionary.decodeToInt(Dictionary.java:45)` > > If i remove one record from file2. It works well > > Code with exist file > {code:java} > spark.conf.set('spark.sql.files.ignoreCorruptFiles', True) > folder_path = 's3://xxx-data-dev/test/ignore_corrupt_files' > file1_path = f'{folder_path}/file1.parquet' > file1_schema = spark.read.parquet(file1_path).schema > file_all_df = spark.read.schema(file1_schema).parquet( folder_path) > file_all_df.show(n=100000) > {code} > Code with creating file > > {code:java} > spark.conf.set('spark.sql.files.ignoreCorruptFiles', True) > schema1 = StructType([ > StructField("program_sk", IntegerType(), True), > StructField("client_sk", IntegerType(), True), > ]) > sample_data = [(1, 17)] > df1 = spark.createDataFrame(sample_data, schema1) > schema2 = StructType([ > StructField("program_sk", IntegerType(), True), > StructField("client_sk", StringType(), True), > ]) > sample_data = [(1, "19999"), (2, "3332"), (3, "199999"), (4, "3333"), > (2, "19999"), (2, "3332"), (3, "199999"), (4, "3333"), > (3, "19999"), (2, "3332"), (3, "199999"), (4, "3333"), > (4, "19999"), (2, "3332"), (3, "199999"), (4, "3333"), > (5, "19999"), (2, "3332"), (3, "199999"), (4, "3333"), > (6, "19999"), (2, "3332"), (3, "199999"), (4, "3333"), > (7, "19999"), (2, "3332"), (3, "199999"), (4, "3333"), > (8, "19999"), (2, "3332"), (3, "199999"), (4, "3333"), > (9, "19999"), (2, "3332"), (3, "199999"), (4, "3333"), > ] > df2 = spark.createDataFrame(sample_data, schema2) > file_save_path = 's3://xxx-data-dev/adp_data_lake/test_ignore_corrupt/' > df1.write \ > .mode('overwrite') \ > .format('parquet') \ > .save(f'{file_save_path}') > df2.write \ > .mode('append') \ > .format('parquet') \ > .save(f'{file_save_path}') > df = spark.read.schema(schema1).parquet(file_save_path) > df.show(){code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org