xsys created SPARK-39075:
----------------------------

             Summary: IncompatibleSchemaException when selecting data from 
table stored from a DataFrame in Avro format with BYTE/SHORT
                 Key: SPARK-39075
                 URL: https://issues.apache.org/jira/browse/SPARK-39075
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.2.1
            Reporter: xsys


h3. Describe the bug

We are trying to save a table constructed through a DataFrame with the {{Avro}} 
data format. The table contains {{ByteType}} or {{ShortType}} as part of the 
schema.

When we {{INSERT}} some valid values (e.g. {{{}-128{}}}) and {{SELECT}} from 
the table, we expect it to give back the inserted value. However, we instead 
get an {{IncompatibleSchemaException}} from the {{{}AvroDeserializer{}}}.

This appears to be caused by a missing case statement handling the {{(INT, 
ShortType)}} and {{(INT, ByteType)}} cases in [{{{}AvroDeserializer 
newWriter{}}}|https://github.com/apache/spark/blob/4f25b3f71238a00508a356591553f2dfa89f8290/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala#L321][{{}}|https://github.com/apache/spark/blob/4f25b3f71238a00508a356591553f2dfa89f8290/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala#L321].
h3. To Reproduce

On Spark 3.2.1 (commit {{{}4f25b3f712{}}}), using {{spark-shell}} with the Avro 
package:
./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:3.2.1{{}}
Execute the following:
import org.apache.spark.sql.\{Row, SparkSession}
import org.apache.spark.sql.types._
val schema = new StructType().add(StructField("c1", ShortType, true))
val rdd = sc.parallelize(Seq(Row("-128".toShort)))
val df = spark.createDataFrame(rdd, schema)
df.write.mode("overwrite").format("avro").saveAsTable("t0")
spark.sql("select * from t0;").show(false){{}}
Resulting error:
22/04/27 18:04:14 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 32)   
                                   
org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Avro type 
\{"type":"record","name":"topLevelRecord","fields":[{"name":"c1","type":["int","null"]}]}
 to SQL type STRUCT<`c1`: SMALLINT>.                                        
        at 
org.apache.spark.sql.avro.AvroDeserializer.liftedTree1$1(AvroDeserializer.scala:102)
                         
        at 
org.apache.spark.sql.avro.AvroDeserializer.<init>(AvroDeserializer.scala:74)    
                             
        at 
org.apache.spark.sql.avro.AvroFileFormat$$anon$1.<init>(AvroFileFormat.scala:143)
                            
        at 
org.apache.spark.sql.avro.AvroFileFormat.$anonfun$buildReader$1(AvroFileFormat.scala:136)
                    
        at 
org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:148)
                    
        at 
org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:133)
                    
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
                                            
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:187)
 
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)
 
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)       
                                        
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)                                                                        
                                           
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
                      
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349)
                     
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)        
                             
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
                             
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)        
                             
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)      
                                        
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)                     
                                        
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)   
                                        
        at org.apache.spark.scheduler.Task.run(Task.scala:131)                  
                                        
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
                             
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)    
                                        
        at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)           
                             
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
                             
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
                             
        at java.lang.Thread.run(Thread.java:748)                                
                                            
Caused by: org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot 
convert Avro field 'c1' to SQL field 'c1' because schema is incompatible 
(avroType = "int", sqlType = SMALLINT) 
        at 
org.apache.spark.sql.avro.AvroDeserializer.newWriter(AvroDeserializer.scala:321)
at 
org.apache.spark.sql.avro.AvroDeserializer.getRecordWriter(AvroDeserializer.scala:356)
        
at 
org.apache.spark.sql.avro.AvroDeserializer.liftedTree1$1(AvroDeserializer.scala:84)
        ... 26 more{{}}
h3. Expected behavior & Possible Solution

We expect the output to successfully select {{{}-128{}}}. We tried other 
formats like Parquet and the outcome is consistent with this expectation.

In the [{{{}AvroSerializer 
newConverter{}}}|https://github.com/apache/spark/blob/4f25b3f71238a00508a356591553f2dfa89f8290/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala#L114][{{}}|https://github.com/apache/spark/blob/4f25b3f71238a00508a356591553f2dfa89f8290/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala#L114],
 {{ByteType}} and {{ShortType}} in Spark are converted to {{INT}} in Avro. It 
can be fixed by adding the {{(INT, ShortType)}} and {{(INT, ByteType)}} cases 
in [{{[AvroDeserializer 
newWriter|https://github.com/apache/spark/blob/4f25b3f71238a00508a356591553f2dfa89f8290/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala#L321]}}|https://github.com/apache/spark/blob/4f25b3f71238a00508a356591553f2dfa89f8290/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala#L321]
 so that the writer can handle the case instead of falling to the default error 
case.

The added cases may look like this:
+ case (INT, ShortType) => (updater, ordinal, value) =>
+         updater.setInt(ordinal, value.asInstanceOf[Short]){{}}
h3. Additional context

We are happy to send a PR for this issue if the fix simply entails adding those 
cases and doing a cast similar to the {{(INT, IntegerType)}} case. This fix 
would provide the cases in the deserializer to match the cases converting 
{{ShortType}} and {{ByteType}} to {{INT}} that are handled in the serializer.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to