[ 
https://issues.apache.org/jira/browse/SPARK-36983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mike updated SPARK-36983:
-------------------------
    Description: 
Precondition:

In folder A having two parquet files
 * File 1: have some columns and one of them is column X with data type Int and 
have only one record
 * File 2: Same schema with File 1 except column X  having data type String and 
having>= 36 records

Read file 1 to get schema of file 1.

Read folder A with schema of file 1.

Expected: Read successfully, file 2 will be ignored as the data type of column 
X changed to string.

Actual: File 2 seems to be not ignored and get error:

 `WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2) (192.168.1.78 
executor driver): java.lang.UnsupportedOperationException: 
org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary
 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2) (192.168.1.78 executor 
driver): java.lang.UnsupportedOperationException: 
org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary
 at org.apache.parquet.column.Dictionary.decodeToInt(Dictionary.java:45)`

 

If i remove one record from file2. It works well  

 
 Code with exist file
{code:java}
spark.conf.set('spark.sql.files.ignoreCorruptFiles', True)
folder_path = 's3://xxx-data-dev/test/ignore_corrupt_files' 
file1_path = f'{folder_path}/file1.parquet' 
file1_schema = spark.read.parquet(file1_path).schema 
file_all_df = spark.read.schema(file1_schema).parquet( folder_path) 
file_all_df.show(n=100000)
{code}
Code with creating file

 
{code:java}
spark.conf.set('spark.sql.files.ignoreCorruptFiles', True)
schema1 = StructType([
 StructField("program_sk", IntegerType(), True),
 StructField("client_sk", IntegerType(), True),
])

sample_data = [(1, 17)]
df1 = spark.createDataFrame(sample_data, schema1)

schema2 = StructType([
 StructField("program_sk", IntegerType(), True),
 StructField("client_sk", StringType(), True),
])
sample_data = [(1, "19999"), (2, "3332"), (3, "199999"), (4, "3333"),
 (2, "19999"), (2, "3332"), (3, "199999"), (4, "3333"),
 (3, "19999"), (2, "3332"), (3, "199999"), (4, "3333"),
 (4, "19999"), (2, "3332"), (3, "199999"), (4, "3333"),
 (5, "19999"), (2, "3332"), (3, "199999"), (4, "3333"),
 (6, "19999"), (2, "3332"), (3, "199999"), (4, "3333"),
 (7, "19999"), (2, "3332"), (3, "199999"), (4, "3333"),
 (8, "19999"), (2, "3332"), (3, "199999"), (4, "3333"),
 (9, "19999"), (2, "3332"), (3, "199999"), (4, "3333"),
 ]
df2 = spark.createDataFrame(sample_data, schema2)
file_save_path = 's3://xxx-data-dev/adp_data_lake/test_ignore_corrupt/'

df1.write \
 .mode('overwrite') \
 .format('parquet') \
 .save(f'{file_save_path}')

df2.write \
 .mode('append') \
 .format('parquet') \
 .save(f'{file_save_path}')

df = spark.read.schema(schema1).parquet(file_save_path)
df.show(){code}

  was:
Precondition:

In folder A having two parquet files
 * File 1: have some columns and one of them is column X with data type Int and 
have only one record
 * File 2: Same schema with File 1 except column X  having data type String and 
having>= 36 records

Read file 1 to get schema of file 1.

Read folder A with schema of file 1.

Expected: Read successfully, file 2 will be ignored as the data type of column 
X changed to string.

Actual: File 2 seems to be not ignored and get error:

 `WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2) (192.168.1.78 
executor driver): java.lang.UnsupportedOperationException: 
org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary
 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2) (192.168.1.78 executor 
driver): java.lang.UnsupportedOperationException: 
org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary
 at org.apache.parquet.column.Dictionary.decodeToInt(Dictionary.java:45)`

 

If i remove one record from file2. It works well  

 
 Code with exist file
{code:java}
spark.conf.set('spark.sql.files.ignoreCorruptFiles', True)
folder_path = 's3://xxx-data-dev/test/ignore_corrupt_files' 
file1_path = f'{folder_path}/file1.parquet' 
file1_schema = spark.read.parquet(file1_path).schema 
file_all_df = spark.read.schema(file1_schema).parquet( folder_path) 
file_all_df.show(n=100000)
{code}
Code with creating file

 
{code:java}
spark.conf.set('spark.sql.files.ignoreCorruptFiles', True)
schema1 = StructType([
 StructField("program_sk", IntegerType(), True),
 StructField("client_sk", IntegerType(), True),
])

sample_data = [(1, 17)]
df1 = spark.createDataFrame(sample_data, schema1)

schema2 = StructType([
 StructField("program_sk", IntegerType(), True),
 StructField("client_sk", StringType(), True),
])
sample_data = [(1, "19999"), (2, "3332"), (3, "199999"), (4, "3333"),
 (2, "19999"), (2, "3332"), (3, "199999"), (4, "3333"),
 (3, "19999"), (2, "3332"), (3, "199999"), (4, "3333"),
 (4, "19999"), (2, "3332"), (3, "199999"), (4, "3333"),
 (5, "19999"), (2, "3332"), (3, "199999"), (4, "3333"),
 (6, "19999"), (2, "3332"), (3, "199999"), (4, "3333"),
 (7, "19999"), (2, "3332"), (3, "199999"), (4, "3333"),
 (8, "19999"), (2, "3332"), (3, "199999"), (4, "3333"),
 (9, "19999"), (2, "3332"), (3, "199999"), (4, "3333"),
 ]
df2 = self.spark.createDataFrame(sample_data, schema2)
file_save_path = 's3://xxx-data-dev/adp_data_lake/test_ignore_corrupt/'

df1.write \
 .mode('overwrite') \
 .format('parquet') \
 .save(f'{file_save_path}')

df2.write \
 .mode('append') \
 .format('parquet') \
 .save(f'{file_save_path}')

df = spark.read.schema(schema1).parquet(file_save_path)
df.show(){code}


> ignoreCorruptFiles does not work when schema change from int to string when a 
> file having more than 35 records
> --------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-36983
>                 URL: https://issues.apache.org/jira/browse/SPARK-36983
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.4.8, 3.1.2
>            Reporter: mike
>            Priority: Major
>         Attachments: file1.parquet, file2.parquet
>
>
> Precondition:
> In folder A having two parquet files
>  * File 1: have some columns and one of them is column X with data type Int 
> and have only one record
>  * File 2: Same schema with File 1 except column X  having data type String 
> and having>= 36 records
> Read file 1 to get schema of file 1.
> Read folder A with schema of file 1.
> Expected: Read successfully, file 2 will be ignored as the data type of 
> column X changed to string.
> Actual: File 2 seems to be not ignored and get error:
>  `WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2) (192.168.1.78 
> executor driver): java.lang.UnsupportedOperationException: 
> org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary
>  WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2) (192.168.1.78 
> executor driver): java.lang.UnsupportedOperationException: 
> org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary
>  at org.apache.parquet.column.Dictionary.decodeToInt(Dictionary.java:45)`
>  
> If i remove one record from file2. It works well  
>  
>  Code with exist file
> {code:java}
> spark.conf.set('spark.sql.files.ignoreCorruptFiles', True)
> folder_path = 's3://xxx-data-dev/test/ignore_corrupt_files' 
> file1_path = f'{folder_path}/file1.parquet' 
> file1_schema = spark.read.parquet(file1_path).schema 
> file_all_df = spark.read.schema(file1_schema).parquet( folder_path) 
> file_all_df.show(n=100000)
> {code}
> Code with creating file
>  
> {code:java}
> spark.conf.set('spark.sql.files.ignoreCorruptFiles', True)
> schema1 = StructType([
>  StructField("program_sk", IntegerType(), True),
>  StructField("client_sk", IntegerType(), True),
> ])
> sample_data = [(1, 17)]
> df1 = spark.createDataFrame(sample_data, schema1)
> schema2 = StructType([
>  StructField("program_sk", IntegerType(), True),
>  StructField("client_sk", StringType(), True),
> ])
> sample_data = [(1, "19999"), (2, "3332"), (3, "199999"), (4, "3333"),
>  (2, "19999"), (2, "3332"), (3, "199999"), (4, "3333"),
>  (3, "19999"), (2, "3332"), (3, "199999"), (4, "3333"),
>  (4, "19999"), (2, "3332"), (3, "199999"), (4, "3333"),
>  (5, "19999"), (2, "3332"), (3, "199999"), (4, "3333"),
>  (6, "19999"), (2, "3332"), (3, "199999"), (4, "3333"),
>  (7, "19999"), (2, "3332"), (3, "199999"), (4, "3333"),
>  (8, "19999"), (2, "3332"), (3, "199999"), (4, "3333"),
>  (9, "19999"), (2, "3332"), (3, "199999"), (4, "3333"),
>  ]
> df2 = spark.createDataFrame(sample_data, schema2)
> file_save_path = 's3://xxx-data-dev/adp_data_lake/test_ignore_corrupt/'
> df1.write \
>  .mode('overwrite') \
>  .format('parquet') \
>  .save(f'{file_save_path}')
> df2.write \
>  .mode('append') \
>  .format('parquet') \
>  .save(f'{file_save_path}')
> df = spark.read.schema(schema1).parquet(file_save_path)
> df.show(){code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to