[ 
https://issues.apache.org/jira/browse/SPARK-39075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17531457#comment-17531457
 ] 

Erik Krogen commented on SPARK-39075:
-------------------------------------

I looked more closely at the code snippet you shared and I now understand that 
the issue is that the data gets written out with an Avro schema of {{INT}}, but 
the Spark schema associated with the table (e.g. {{spark.sources.schema}} 
property) contains a {{ShortType}}, which is then unreadable. I agree that this 
is definitely a bug. The question, then, is which side is wrong: the reader or 
the writer?

In this case I actually think the best thing to do would be to annotate the 
output Avro schema with additional information that, although the type is 
{{INT}}, it actually contains shorts. Avro allows for attaching arbitrary 
properties to schemas, which are used internally by Avro for defining logical 
types. We could do a similar thing, storing a property like "actualType = 
short", which could then be used by the reader to understand that the downcast 
is safe.

> IncompatibleSchemaException when selecting data from table stored from a 
> DataFrame in Avro format with BYTE/SHORT
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-39075
>                 URL: https://issues.apache.org/jira/browse/SPARK-39075
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.2.1
>            Reporter: xsys
>            Priority: Major
>
> h3. Describe the bug
> We are trying to save a table constructed through a DataFrame with the 
> {{Avro}} data format. The table contains {{ByteType}} or {{ShortType}} as 
> part of the schema.
> When we {{INSERT}} some valid values (e.g. {{{}-128{}}}) and {{SELECT}} from 
> the table, we expect it to give back the inserted value. However, we instead 
> get an {{IncompatibleSchemaException}} from the {{{}AvroDeserializer{}}}.
> This appears to be caused by a missing case statement handling the {{(INT, 
> ShortType)}} and {{(INT, ByteType)}} cases in [{{AvroDeserializer 
> newWriter}}|https://github.com/apache/spark/blob/4f25b3f71238a00508a356591553f2dfa89f8290/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala#L321].
> h3. To Reproduce
> On Spark 3.2.1 (commit {{{}4f25b3f712{}}}), using {{spark-shell}} with the 
> Avro package:
> {code:java}
> ./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:3.2.1{code}
> Execute the following:
> {code:java}
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
> val schema = new StructType().add(StructField("c1", ShortType, true))
> val rdd = sc.parallelize(Seq(Row("-128".toShort)))
> val df = spark.createDataFrame(rdd, schema)
> df.write.mode("overwrite").format("avro").saveAsTable("t0")
> spark.sql("select * from t0;").show(false){code}
> Resulting error:
> {code:java}
> 22/04/27 18:04:14 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 32) 
> org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Avro 
> type {"type":"record","name":"topLevelRecord","fields":[
> {"name":"c1","type":["int","null"]}
> ]} to SQL type STRUCT<`c1`: SMALLINT>. 
> at 
> org.apache.spark.sql.avro.AvroDeserializer.liftedTree1$1(AvroDeserializer.scala:102)
>  
> at 
> org.apache.spark.sql.avro.AvroDeserializer.<init>(AvroDeserializer.scala:74) 
> at 
> org.apache.spark.sql.avro.AvroFileFormat$$anon$1.<init>(AvroFileFormat.scala:143)
>  
> at 
> org.apache.spark.sql.avro.AvroFileFormat.$anonfun$buildReader$1(AvroFileFormat.scala:136)
>  
> at 
> org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:148)
>  
> at 
> org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:133)
>  
> at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
>  
> at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:187)
>  
> at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)
>  
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) 
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>  Source) 
> at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>  
> at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
> at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349)
>  
> at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) 
> at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
>  
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) 
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) 
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) 
> at org.apache.spark.scheduler.Task.run(Task.scala:131) 
> at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
>  
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) 
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) 
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  
> at java.lang.Thread.run(Thread.java:748) 
> Caused by: org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot 
> convert Avro field 'c1' to SQL field 'c1' because schema is incompatible 
> (avroType = "int", sqlType = SMALLINT) 
> at 
> org.apache.spark.sql.avro.AvroDeserializer.newWriter(AvroDeserializer.scala:321)
> at 
> org.apache.spark.sql.avro.AvroDeserializer.getRecordWriter(AvroDeserializer.scala:356)
>  
> at 
> org.apache.spark.sql.avro.AvroDeserializer.liftedTree1$1(AvroDeserializer.scala:84)
> ... 26 more
> {code}
> h3. Expected behavior & Possible Solution
> We expect the output to successfully select {{-128}}. We tried other formats 
> like Parquet and the outcome is consistent with this expectation.
> In the [{{AvroSerializer 
> newConverter}}|https://github.com/apache/spark/blob/4f25b3f71238a00508a356591553f2dfa89f8290/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala#L114],
>  {{ByteType}} and {{ShortType}} in Spark are converted to {{INT}} in Avro. It 
> can be fixed by adding the {{(INT, ShortType)}} and {{(INT, ByteType)}} cases 
> in [AvroDeserializer 
> newWriter|https://github.com/apache/spark/blob/4f25b3f71238a00508a356591553f2dfa89f8290/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala#L321]
>  so that the writer can handle the case instead of falling to the default 
> error case.
> The added cases may look like this:
> {code:java}
> + case (INT, ShortType) => (updater, ordinal, value) =>
>     + updater.setInt(ordinal, value.asInstanceOf[Short]){code}
> h3. Additional context
> We are happy to send a PR for this issue if the fix simply entails adding 
> those cases and doing a cast similar to the {{(INT, IntegerType)}} case. This 
> fix would provide the cases in the deserializer to match the cases converting 
> {{ShortType}} and {{ByteType}} to {{INT}} that are handled in the serializer.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to