[ 
https://issues.apache.org/jira/browse/SPARK-39075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Erik Krogen updated SPARK-39075:
--------------------------------
    Description: 
h3. Describe the bug

We are trying to save a table constructed through a DataFrame with the {{Avro}} 
data format. The table contains {{ByteType}} or {{ShortType}} as part of the 
schema.

When we {{INSERT}} some valid values (e.g. {{{}-128{}}}) and {{SELECT}} from 
the table, we expect it to give back the inserted value. However, we instead 
get an {{IncompatibleSchemaException}} from the {{{}AvroDeserializer{}}}.

This appears to be caused by a missing case statement handling the {{(INT, 
ShortType)}} and {{(INT, ByteType)}} cases in [{{AvroDeserializer 
newWriter}}|https://github.com/apache/spark/blob/4f25b3f71238a00508a356591553f2dfa89f8290/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala#L321].
h3. To Reproduce

On Spark 3.2.1 (commit {{{}4f25b3f712{}}}), using {{spark-shell}} with the Avro 
package:
{code:java}
./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:3.2.1{code}
Execute the following:
{code:java}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
val schema = new StructType().add(StructField("c1", ShortType, true))
val rdd = sc.parallelize(Seq(Row("-128".toShort)))
val df = spark.createDataFrame(rdd, schema)
df.write.mode("overwrite").format("avro").saveAsTable("t0")
spark.sql("select * from t0;").show(false){code}
Resulting error:
{code:java}
22/04/27 18:04:14 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 32) 
org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Avro type 
{"type":"record","name":"topLevelRecord","fields":[
{"name":"c1","type":["int","null"]}
]} to SQL type STRUCT<`c1`: SMALLINT>. 
at 
org.apache.spark.sql.avro.AvroDeserializer.liftedTree1$1(AvroDeserializer.scala:102)
 
at org.apache.spark.sql.avro.AvroDeserializer.<init>(AvroDeserializer.scala:74) 
at 
org.apache.spark.sql.avro.AvroFileFormat$$anon$1.<init>(AvroFileFormat.scala:143)
 
at 
org.apache.spark.sql.avro.AvroFileFormat.$anonfun$buildReader$1(AvroFileFormat.scala:136)
 
at 
org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:148)
 
at 
org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:133)
 
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
 
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:187)
 
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)
 
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) 
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source) 
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349)
 
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) 
at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
 
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) 
at org.apache.spark.scheduler.Task.run(Task.scala:131) 
at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
 
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) 
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) 
Caused by: org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot 
convert Avro field 'c1' to SQL field 'c1' because schema is incompatible 
(avroType = "int", sqlType = SMALLINT) 
at 
org.apache.spark.sql.avro.AvroDeserializer.newWriter(AvroDeserializer.scala:321)
at 
org.apache.spark.sql.avro.AvroDeserializer.getRecordWriter(AvroDeserializer.scala:356)
 
at 
org.apache.spark.sql.avro.AvroDeserializer.liftedTree1$1(AvroDeserializer.scala:84)
... 26 more
{code}
h3. Expected behavior & Possible Solution

We expect the output to successfully select {{-128}}. We tried other formats 
like Parquet and the outcome is consistent with this expectation.

In the [{{AvroSerializer 
newConverter}}|https://github.com/apache/spark/blob/4f25b3f71238a00508a356591553f2dfa89f8290/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala#L114],
 {{ByteType}} and {{ShortType}} in Spark are converted to {{INT}} in Avro. It 
can be fixed by adding the {{(INT, ShortType)}} and {{(INT, ByteType)}} cases 
in [AvroDeserializer 
newWriter|https://github.com/apache/spark/blob/4f25b3f71238a00508a356591553f2dfa89f8290/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala#L321]
 so that the writer can handle the case instead of falling to the default error 
case.

The added cases may look like this:
{code:java}
+ case (INT, ShortType) => (updater, ordinal, value) =>
    + updater.setInt(ordinal, value.asInstanceOf[Short]){code}
h3. Additional context

We are happy to send a PR for this issue if the fix simply entails adding those 
cases and doing a cast similar to the {{(INT, IntegerType)}} case. This fix 
would provide the cases in the deserializer to match the cases converting 
{{ShortType}} and {{ByteType}} to {{INT}} that are handled in the serializer.

  was:
h3. Describe the bug

We are trying to save a table constructed through a DataFrame with the {{Avro}} 
data format. The table contains {{ByteType}} or {{ShortType}} as part of the 
schema.

When we {{INSERT}} some valid values (e.g. {{{}-128{}}}) and {{SELECT}} from 
the table, we expect it to give back the inserted value. However, we instead 
get an {{IncompatibleSchemaException}} from the {{{}AvroDeserializer{}}}.

This appears to be caused by a missing case statement handling the {{(INT, 
ShortType)}} and {{(INT, ByteType)}} cases in [{{AvroDeserializer 
newWriter}}|https://github.com/apache/spark/blob/4f25b3f71238a00508a356591553f2dfa89f8290/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala#L321].
h3. To Reproduce

On Spark 3.2.1 (commit {{{}4f25b3f712{}}}), using {{spark-shell}} with the Avro 
package:
{code:java}
./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:3.2.1{code}
Execute the following:
{code:java}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
val schema = new StructType().add(StructField("c1", ShortType, true))
val rdd = sc.parallelize(Seq(Row("-128".toShort)))
val df = spark.createDataFrame(rdd, schema)
df.write.mode("overwrite").format("avro").saveAsTable("t0")
spark.sql("select * from t0;").show(false){code}
Resulting error:
{code:java}
22/04/27 18:04:14 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 32) 
org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Avro type 
{"type":"record","name":"topLevelRecord","fields":[
{"name":"c1","type":["int","null"]}
]} to SQL type STRUCT<`c1`: SMALLINT>. 
at 
org.apache.spark.sql.avro.AvroDeserializer.liftedTree1$1(AvroDeserializer.scala:102)
 
at org.apache.spark.sql.avro.AvroDeserializer.<init>(AvroDeserializer.scala:74) 
at 
org.apache.spark.sql.avro.AvroFileFormat$$anon$1.<init>(AvroFileFormat.scala:143)
 
at 
org.apache.spark.sql.avro.AvroFileFormat.$anonfun$buildReader$1(AvroFileFormat.scala:136)
 
at 
org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:148)
 
at 
org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:133)
 
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
 
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:187)
 
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)
 
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) 
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source) 
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349)
 
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) 
at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
 
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) 
at org.apache.spark.scheduler.Task.run(Task.scala:131) 
at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
 
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) 
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) 
Caused by: org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot 
convert Avro field 'c1' to SQL field 'c1' because schema is incompatible 
(avroType = "int", sqlType = SMALLINT) 
at 
org.apache.spark.sql.avro.AvroDeserializer.newWriter(AvroDeserializer.scala:321)
at 
org.apache.spark.sql.avro.AvroDeserializer.getRecordWriter(AvroDeserializer.scala:356)
 
at 
org.apache.spark.sql.avro.AvroDeserializer.liftedTree1$1(AvroDeserializer.scala:84)
... 26 more
{code}
h3. Expected behavior & Possible Solution

We expect the output to successfully select {{{}-128{}}}. We tried other 
formats like Parquet and the outcome is consistent with this expectation.

In the [{{AvroSerializer 
newConverter}}|https://github.com/apache/spark/blob/4f25b3f71238a00508a356591553f2dfa89f8290/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala#L114]{{{},
 {{ByteType{}}} and {{ShortType}} in Spark are converted to {{INT}} in Avro. It 
can be fixed by adding the {{(INT, ShortType)}} and {{(INT, ByteType)}} cases 
in [AvroDeserializer 
newWriter|https://github.com/apache/spark/blob/4f25b3f71238a00508a356591553f2dfa89f8290/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala#L321]
 so that the writer can handle the case instead of falling to the default error 
case.

The added cases may look like this:
{code:java}
+ case (INT, ShortType) => (updater, ordinal, value) =>
    + updater.setInt(ordinal, value.asInstanceOf[Short]){code}
h3. Additional context

We are happy to send a PR for this issue if the fix simply entails adding those 
cases and doing a cast similar to the {{(INT, IntegerType)}} case. This fix 
would provide the cases in the deserializer to match the cases converting 
{{ShortType}} and {{ByteType}} to {{INT}} that are handled in the serializer.


> IncompatibleSchemaException when selecting data from table stored from a 
> DataFrame in Avro format with BYTE/SHORT
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-39075
>                 URL: https://issues.apache.org/jira/browse/SPARK-39075
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.2.1
>            Reporter: xsys
>            Priority: Major
>
> h3. Describe the bug
> We are trying to save a table constructed through a DataFrame with the 
> {{Avro}} data format. The table contains {{ByteType}} or {{ShortType}} as 
> part of the schema.
> When we {{INSERT}} some valid values (e.g. {{{}-128{}}}) and {{SELECT}} from 
> the table, we expect it to give back the inserted value. However, we instead 
> get an {{IncompatibleSchemaException}} from the {{{}AvroDeserializer{}}}.
> This appears to be caused by a missing case statement handling the {{(INT, 
> ShortType)}} and {{(INT, ByteType)}} cases in [{{AvroDeserializer 
> newWriter}}|https://github.com/apache/spark/blob/4f25b3f71238a00508a356591553f2dfa89f8290/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala#L321].
> h3. To Reproduce
> On Spark 3.2.1 (commit {{{}4f25b3f712{}}}), using {{spark-shell}} with the 
> Avro package:
> {code:java}
> ./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:3.2.1{code}
> Execute the following:
> {code:java}
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
> val schema = new StructType().add(StructField("c1", ShortType, true))
> val rdd = sc.parallelize(Seq(Row("-128".toShort)))
> val df = spark.createDataFrame(rdd, schema)
> df.write.mode("overwrite").format("avro").saveAsTable("t0")
> spark.sql("select * from t0;").show(false){code}
> Resulting error:
> {code:java}
> 22/04/27 18:04:14 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 32) 
> org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Avro 
> type {"type":"record","name":"topLevelRecord","fields":[
> {"name":"c1","type":["int","null"]}
> ]} to SQL type STRUCT<`c1`: SMALLINT>. 
> at 
> org.apache.spark.sql.avro.AvroDeserializer.liftedTree1$1(AvroDeserializer.scala:102)
>  
> at 
> org.apache.spark.sql.avro.AvroDeserializer.<init>(AvroDeserializer.scala:74) 
> at 
> org.apache.spark.sql.avro.AvroFileFormat$$anon$1.<init>(AvroFileFormat.scala:143)
>  
> at 
> org.apache.spark.sql.avro.AvroFileFormat.$anonfun$buildReader$1(AvroFileFormat.scala:136)
>  
> at 
> org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:148)
>  
> at 
> org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:133)
>  
> at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
>  
> at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:187)
>  
> at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)
>  
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) 
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>  Source) 
> at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>  
> at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
> at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349)
>  
> at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) 
> at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
>  
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) 
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) 
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) 
> at org.apache.spark.scheduler.Task.run(Task.scala:131) 
> at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
>  
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) 
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) 
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  
> at java.lang.Thread.run(Thread.java:748) 
> Caused by: org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot 
> convert Avro field 'c1' to SQL field 'c1' because schema is incompatible 
> (avroType = "int", sqlType = SMALLINT) 
> at 
> org.apache.spark.sql.avro.AvroDeserializer.newWriter(AvroDeserializer.scala:321)
> at 
> org.apache.spark.sql.avro.AvroDeserializer.getRecordWriter(AvroDeserializer.scala:356)
>  
> at 
> org.apache.spark.sql.avro.AvroDeserializer.liftedTree1$1(AvroDeserializer.scala:84)
> ... 26 more
> {code}
> h3. Expected behavior & Possible Solution
> We expect the output to successfully select {{-128}}. We tried other formats 
> like Parquet and the outcome is consistent with this expectation.
> In the [{{AvroSerializer 
> newConverter}}|https://github.com/apache/spark/blob/4f25b3f71238a00508a356591553f2dfa89f8290/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala#L114],
>  {{ByteType}} and {{ShortType}} in Spark are converted to {{INT}} in Avro. It 
> can be fixed by adding the {{(INT, ShortType)}} and {{(INT, ByteType)}} cases 
> in [AvroDeserializer 
> newWriter|https://github.com/apache/spark/blob/4f25b3f71238a00508a356591553f2dfa89f8290/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala#L321]
>  so that the writer can handle the case instead of falling to the default 
> error case.
> The added cases may look like this:
> {code:java}
> + case (INT, ShortType) => (updater, ordinal, value) =>
>     + updater.setInt(ordinal, value.asInstanceOf[Short]){code}
> h3. Additional context
> We are happy to send a PR for this issue if the fix simply entails adding 
> those cases and doing a cast similar to the {{(INT, IntegerType)}} case. This 
> fix would provide the cases in the deserializer to match the cases converting 
> {{ShortType}} and {{ByteType}} to {{INT}} that are handled in the serializer.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to