[ https://issues.apache.org/jira/browse/SPARK-39075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
xsys updated SPARK-39075: ------------------------- Description: h3. Describe the bug We are trying to save a table constructed through a DataFrame with the {{Avro}} data format. The table contains {{ByteType}} or {{ShortType}} as part of the schema. When we {{INSERT}} some valid values (e.g. {{{}-128{}}}) and {{SELECT}} from the table, we expect it to give back the inserted value. However, we instead get an {{IncompatibleSchemaException}} from the {{{}AvroDeserializer{}}}. This appears to be caused by a missing case statement handling the {{(INT, ShortType)}} and {{(INT, ByteType)}} cases in [{{AvroDeserializer newWriter}}|https://github.com/apache/spark/blob/4f25b3f71238a00508a356591553f2dfa89f8290/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala#L321]. h3. To Reproduce On Spark 3.2.1 (commit {{{}4f25b3f712{}}}), using {{spark-shell}} with the Avro package: ./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:3.2.1\{{}} Execute the following: import org.apache.spark.sql.\{Row, SparkSession} import org.apache.spark.sql.types._ val schema = new StructType().add(StructField("c1", ShortType, true)) val rdd = sc.parallelize(Seq(Row("-128".toShort))) val df = spark.createDataFrame(rdd, schema) df.write.mode("overwrite").format("avro").saveAsTable("t0") spark.sql("select * from t0;").show(false)\{{}} Resulting error: 22/04/27 18:04:14 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 32) org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Avro type {"type":"record","name":"topLevelRecord","fields":[ {"name":"c1","type":["int","null"]} ]} to SQL type STRUCT<`c1`: SMALLINT>. at org.apache.spark.sql.avro.AvroDeserializer.liftedTree1$1(AvroDeserializer.scala:102) at org.apache.spark.sql.avro.AvroDeserializer.<init>(AvroDeserializer.scala:74) at org.apache.spark.sql.avro.AvroFileFormat$$anon$1.<init>(AvroFileFormat.scala:143) at org.apache.spark.sql.avro.AvroFileFormat.$anonfun$buildReader$1(AvroFileFormat.scala:136) at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:148) at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:133) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:187) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Avro field 'c1' to SQL field 'c1' because schema is incompatible (avroType = "int", sqlType = SMALLINT) at org.apache.spark.sql.avro.AvroDeserializer.newWriter(AvroDeserializer.scala:321) at org.apache.spark.sql.avro.AvroDeserializer.getRecordWriter(AvroDeserializer.scala:356) at org.apache.spark.sql.avro.AvroDeserializer.liftedTree1$1(AvroDeserializer.scala:84) ... 26 more\{{}} h3. Expected behavior & Possible Solution We expect the output to successfully select {{{}-128{}}}. We tried other formats like Parquet and the outcome is consistent with this expectation. In the [{{AvroSerializer newConverter}}|https://github.com/apache/spark/blob/4f25b3f71238a00508a356591553f2dfa89f8290/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala#L114]{{{}, {{ByteType{}}} and {{ShortType}} in Spark are converted to {{INT}} in Avro. It can be fixed by adding the {{(INT, ShortType)}} and {{(INT, ByteType)}} cases in [AvroDeserializer newWriter|https://github.com/apache/spark/blob/4f25b3f71238a00508a356591553f2dfa89f8290/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala#L321] so that the writer can handle the case instead of falling to the default error case. The added cases may look like this: + case (INT, ShortType) => (updater, ordinal, value) => + updater.setInt(ordinal, value.asInstanceOf[Short])\{{}} h3. Additional context We are happy to send a PR for this issue if the fix simply entails adding those cases and doing a cast similar to the {{(INT, IntegerType)}} case. This fix would provide the cases in the deserializer to match the cases converting {{ShortType}} and {{ByteType}} to {{INT}} that are handled in the serializer. was: h3. Describe the bug We are trying to save a table constructed through a DataFrame with the {{Avro}} data format. The table contains {{ByteType}} or {{ShortType}} as part of the schema. When we {{INSERT}} some valid values (e.g. {{{}-128{}}}) and {{SELECT}} from the table, we expect it to give back the inserted value. However, we instead get an {{IncompatibleSchemaException}} from the {{{}AvroDeserializer{}}}. This appears to be caused by a missing case statement handling the {{(INT, ShortType)}} and {{(INT, ByteType)}} cases in [{{AvroDeserializer newWriter}}|https://github.com/apache/spark/blob/4f25b3f71238a00508a356591553f2dfa89f8290/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala#L321]. h3. To Reproduce On Spark 3.2.1 (commit {{{}4f25b3f712{}}}), using {{spark-shell}} with the Avro package: ./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:3.2.1\{{}} Execute the following: import org.apache.spark.sql.\{Row, SparkSession} import org.apache.spark.sql.types._ val schema = new StructType().add(StructField("c1", ShortType, true)) val rdd = sc.parallelize(Seq(Row("-128".toShort))) val df = spark.createDataFrame(rdd, schema) df.write.mode("overwrite").format("avro").saveAsTable("t0") spark.sql("select * from t0;").show(false)\{{}} Resulting error: 22/04/27 18:04:14 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 32) org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Avro type {"type":"record","name":"topLevelRecord","fields":[ {"name":"c1","type":["int","null"]} ]} to SQL type STRUCT<`c1`: SMALLINT>. at org.apache.spark.sql.avro.AvroDeserializer.liftedTree1$1(AvroDeserializer.scala:102) at org.apache.spark.sql.avro.AvroDeserializer.<init>(AvroDeserializer.scala:74) at org.apache.spark.sql.avro.AvroFileFormat$$anon$1.<init>(AvroFileFormat.scala:143) at org.apache.spark.sql.avro.AvroFileFormat.$anonfun$buildReader$1(AvroFileFormat.scala:136) at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:148) at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:133) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:187) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Avro field 'c1' to SQL field 'c1' because schema is incompatible (avroType = "int", sqlType = SMALLINT) at org.apache.spark.sql.avro.AvroDeserializer.newWriter(AvroDeserializer.scala:321) at org.apache.spark.sql.avro.AvroDeserializer.getRecordWriter(AvroDeserializer.scala:356) at org.apache.spark.sql.avro.AvroDeserializer.liftedTree1$1(AvroDeserializer.scala:84) ... 26 more\{{}} h3. Expected behavior & Possible Solution We expect the output to successfully select {{{}-128{}}}. We tried other formats like Parquet and the outcome is consistent with this expectation. In the [{{AvroSerializer newConverter}}|https://github.com/apache/spark/blob/4f25b3f71238a00508a356591553f2dfa89f8290/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala#L114]{{{}, {{ByteType{}}} and {{ShortType}} in Spark are converted to {{INT}} in Avro. It can be fixed by adding the {{(INT, ShortType)}} and {{(INT, ByteType)}} cases in [AvroDeserializer newWriter|https://github.com/apache/spark/blob/4f25b3f71238a00508a356591553f2dfa89f8290/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala#L321]}}] so that the writer can handle the case instead of falling to the default error case. The added cases may look like this: + case (INT, ShortType) => (updater, ordinal, value) => + updater.setInt(ordinal, value.asInstanceOf[Short])\{{}} h3. Additional context We are happy to send a PR for this issue if the fix simply entails adding those cases and doing a cast similar to the {{(INT, IntegerType)}} case. This fix would provide the cases in the deserializer to match the cases converting {{ShortType}} and {{ByteType}} to {{INT}} that are handled in the serializer. > IncompatibleSchemaException when selecting data from table stored from a > DataFrame in Avro format with BYTE/SHORT > ----------------------------------------------------------------------------------------------------------------- > > Key: SPARK-39075 > URL: https://issues.apache.org/jira/browse/SPARK-39075 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.2.1 > Reporter: xsys > Priority: Major > > h3. Describe the bug > We are trying to save a table constructed through a DataFrame with the > {{Avro}} data format. The table contains {{ByteType}} or {{ShortType}} as > part of the schema. > When we {{INSERT}} some valid values (e.g. {{{}-128{}}}) and {{SELECT}} from > the table, we expect it to give back the inserted value. However, we instead > get an {{IncompatibleSchemaException}} from the {{{}AvroDeserializer{}}}. > This appears to be caused by a missing case statement handling the {{(INT, > ShortType)}} and {{(INT, ByteType)}} cases in [{{AvroDeserializer > newWriter}}|https://github.com/apache/spark/blob/4f25b3f71238a00508a356591553f2dfa89f8290/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala#L321]. > h3. To Reproduce > On Spark 3.2.1 (commit {{{}4f25b3f712{}}}), using {{spark-shell}} with the > Avro package: > ./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:3.2.1\{{}} > Execute the following: > import org.apache.spark.sql.\{Row, SparkSession} > import org.apache.spark.sql.types._ > val schema = new StructType().add(StructField("c1", ShortType, true)) > val rdd = sc.parallelize(Seq(Row("-128".toShort))) > val df = spark.createDataFrame(rdd, schema) > df.write.mode("overwrite").format("avro").saveAsTable("t0") > spark.sql("select * from t0;").show(false)\{{}} > Resulting error: > 22/04/27 18:04:14 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 32) > org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Avro > type {"type":"record","name":"topLevelRecord","fields":[ > {"name":"c1","type":["int","null"]} > ]} to SQL type STRUCT<`c1`: SMALLINT>. > at > org.apache.spark.sql.avro.AvroDeserializer.liftedTree1$1(AvroDeserializer.scala:102) > > at > org.apache.spark.sql.avro.AvroDeserializer.<init>(AvroDeserializer.scala:74) > at > org.apache.spark.sql.avro.AvroFileFormat$$anon$1.<init>(AvroFileFormat.scala:143) > > at > org.apache.spark.sql.avro.AvroFileFormat.$anonfun$buildReader$1(AvroFileFormat.scala:136) > > at > org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:148) > > at > org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:133) > > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127) > > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:187) > > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104) > > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349) > > at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) > at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) > > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:131) > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) > > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot > convert Avro field 'c1' to SQL field 'c1' because schema is incompatible > (avroType = "int", sqlType = SMALLINT) > at > org.apache.spark.sql.avro.AvroDeserializer.newWriter(AvroDeserializer.scala:321) > at > org.apache.spark.sql.avro.AvroDeserializer.getRecordWriter(AvroDeserializer.scala:356) > > at > org.apache.spark.sql.avro.AvroDeserializer.liftedTree1$1(AvroDeserializer.scala:84) > ... 26 more\{{}} > h3. Expected behavior & Possible Solution > We expect the output to successfully select {{{}-128{}}}. We tried other > formats like Parquet and the outcome is consistent with this expectation. > In the [{{AvroSerializer > newConverter}}|https://github.com/apache/spark/blob/4f25b3f71238a00508a356591553f2dfa89f8290/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala#L114]{{{}, > {{ByteType{}}} and {{ShortType}} in Spark are converted to {{INT}} in Avro. > It can be fixed by adding the {{(INT, ShortType)}} and {{(INT, ByteType)}} > cases in [AvroDeserializer > newWriter|https://github.com/apache/spark/blob/4f25b3f71238a00508a356591553f2dfa89f8290/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala#L321] > so that the writer can handle the case instead of falling to the default > error case. > The added cases may look like this: > + case (INT, ShortType) => (updater, ordinal, value) => > + updater.setInt(ordinal, value.asInstanceOf[Short])\{{}} > h3. Additional context > We are happy to send a PR for this issue if the fix simply entails adding > those cases and doing a cast similar to the {{(INT, IntegerType)}} case. This > fix would provide the cases in the deserializer to match the cases converting > {{ShortType}} and {{ByteType}} to {{INT}} that are handled in the serializer. -- This message was sent by Atlassian Jira (v8.20.7#820007) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org