[ 
https://issues.apache.org/jira/browse/SPARK-39075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17531398#comment-17531398
 ] 

xsys commented on SPARK-39075:
------------------------------

Thanks for the response, Erik.

I understand the concern. OTOH, in principle it is inconsistent and confusing 
that one can write a piece of data but cannot read it back via Spark/Avro. It’s 
almost equivalent to a data loss.

Moreover, DataFrame enforces explicit type checks so one can only write 
SHORT/BYTE-typed data into a SHORT/BYTE column. In this context, it is safe to 
downcast. And, it does not make sense that Avro’s lack of SHORT/BYTE type 
support breaks DataFrame operation.

The concern is valid under the context that the source of the serialized data 
is unknown, so potentially downcasting is unsafe.

 

One way to systematically address the issue is to determine whether Spark is 
the source of the serialized data, and permitting the cast in this context. 
Because the SELECT API is used, the data is retrieved from a table through Hive 
or another supported Spark store, and not from a standalone Avro file. We could 
then potentially leverage Spark-specific metadata stored with the Hive table 
and provide this context to the deserializer.

Or we can change the Spark schema type from SHORT/BYTE to INT, like what 
SparkSQL does in the 
[HiveExternalCatalog|https://github.com/apache/spark/blob/4df8512b11dc9cc3a179fd5ccedf91af1f3fc6ee/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala#L821].

> IncompatibleSchemaException when selecting data from table stored from a 
> DataFrame in Avro format with BYTE/SHORT
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-39075
>                 URL: https://issues.apache.org/jira/browse/SPARK-39075
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.2.1
>            Reporter: xsys
>            Priority: Major
>
> h3. Describe the bug
> We are trying to save a table constructed through a DataFrame with the 
> {{Avro}} data format. The table contains {{ByteType}} or {{ShortType}} as 
> part of the schema.
> When we {{INSERT}} some valid values (e.g. {{{}-128{}}}) and {{SELECT}} from 
> the table, we expect it to give back the inserted value. However, we instead 
> get an {{IncompatibleSchemaException}} from the {{{}AvroDeserializer{}}}.
> This appears to be caused by a missing case statement handling the {{(INT, 
> ShortType)}} and {{(INT, ByteType)}} cases in [{{AvroDeserializer 
> newWriter}}|https://github.com/apache/spark/blob/4f25b3f71238a00508a356591553f2dfa89f8290/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala#L321].
> h3. To Reproduce
> On Spark 3.2.1 (commit {{{}4f25b3f712{}}}), using {{spark-shell}} with the 
> Avro package:
> {code:java}
> ./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:3.2.1{code}
> Execute the following:
> {code:java}
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
> val schema = new StructType().add(StructField("c1", ShortType, true))
> val rdd = sc.parallelize(Seq(Row("-128".toShort)))
> val df = spark.createDataFrame(rdd, schema)
> df.write.mode("overwrite").format("avro").saveAsTable("t0")
> spark.sql("select * from t0;").show(false){code}
> Resulting error:
> {code:java}
> 22/04/27 18:04:14 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 32) 
> org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Avro 
> type {"type":"record","name":"topLevelRecord","fields":[
> {"name":"c1","type":["int","null"]}
> ]} to SQL type STRUCT<`c1`: SMALLINT>. 
> at 
> org.apache.spark.sql.avro.AvroDeserializer.liftedTree1$1(AvroDeserializer.scala:102)
>  
> at 
> org.apache.spark.sql.avro.AvroDeserializer.<init>(AvroDeserializer.scala:74) 
> at 
> org.apache.spark.sql.avro.AvroFileFormat$$anon$1.<init>(AvroFileFormat.scala:143)
>  
> at 
> org.apache.spark.sql.avro.AvroFileFormat.$anonfun$buildReader$1(AvroFileFormat.scala:136)
>  
> at 
> org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:148)
>  
> at 
> org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:133)
>  
> at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
>  
> at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:187)
>  
> at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)
>  
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) 
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>  Source) 
> at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>  
> at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
> at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349)
>  
> at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) 
> at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
>  
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) 
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) 
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) 
> at org.apache.spark.scheduler.Task.run(Task.scala:131) 
> at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
>  
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) 
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) 
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  
> at java.lang.Thread.run(Thread.java:748) 
> Caused by: org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot 
> convert Avro field 'c1' to SQL field 'c1' because schema is incompatible 
> (avroType = "int", sqlType = SMALLINT) 
> at 
> org.apache.spark.sql.avro.AvroDeserializer.newWriter(AvroDeserializer.scala:321)
> at 
> org.apache.spark.sql.avro.AvroDeserializer.getRecordWriter(AvroDeserializer.scala:356)
>  
> at 
> org.apache.spark.sql.avro.AvroDeserializer.liftedTree1$1(AvroDeserializer.scala:84)
> ... 26 more
> {code}
> h3. Expected behavior & Possible Solution
> We expect the output to successfully select {{-128}}. We tried other formats 
> like Parquet and the outcome is consistent with this expectation.
> In the [{{AvroSerializer 
> newConverter}}|https://github.com/apache/spark/blob/4f25b3f71238a00508a356591553f2dfa89f8290/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala#L114],
>  {{ByteType}} and {{ShortType}} in Spark are converted to {{INT}} in Avro. It 
> can be fixed by adding the {{(INT, ShortType)}} and {{(INT, ByteType)}} cases 
> in [AvroDeserializer 
> newWriter|https://github.com/apache/spark/blob/4f25b3f71238a00508a356591553f2dfa89f8290/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala#L321]
>  so that the writer can handle the case instead of falling to the default 
> error case.
> The added cases may look like this:
> {code:java}
> + case (INT, ShortType) => (updater, ordinal, value) =>
>     + updater.setInt(ordinal, value.asInstanceOf[Short]){code}
> h3. Additional context
> We are happy to send a PR for this issue if the fix simply entails adding 
> those cases and doing a cast similar to the {{(INT, IntegerType)}} case. This 
> fix would provide the cases in the deserializer to match the cases converting 
> {{ShortType}} and {{ByteType}} to {{INT}} that are handled in the serializer.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to