[ 
https://issues.apache.org/jira/browse/SPARK-40409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17611909#comment-17611909
 ] 

Apache Spark commented on SPARK-40409:
--------------------------------------

User 'zzzzming95' has created a pull request for this issue:
https://github.com/apache/spark/pull/38068

> IncompatibleSchemaException when BYTE stored from DataFrame to Avro is read 
> using spark-sql
> -------------------------------------------------------------------------------------------
>
>                 Key: SPARK-40409
>                 URL: https://issues.apache.org/jira/browse/SPARK-40409
>             Project: Spark
>          Issue Type: Bug
>          Components: Input/Output
>    Affects Versions: 3.2.1
>            Reporter: xsys
>            Priority: Major
>
> h3. Describe the bug
> We are trying to store a BYTE {{"-128"}} to a table created via Spark 
> DataFrame. The table is created with the Avro file format. We encounter no 
> errors while creating the table and inserting the aforementioned BYTE value. 
> However, performing a SELECT query on the table through spark-sql results in 
> an {{IncompatibleSchemaException}} as shown below:
> {code:java}
> 2022-09-09 21:15:03,248 ERROR executor.Executor: Exception in task 0.0 in 
> stage 0.0 (TID 0)
> org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Avro 
> type {"type":"record","name":"topLevelRecord","fields"$
> [{"name":"c1","type":["int","null"]}]} to SQL type STRUCT<`c1`: TINYINT>{code}
> h3. Step to reproduce
> On Spark 3.2.1 (commit {{{}4f25b3f712{}}}), using {{spark-shell}} with the 
> Avro package:
> {code:java}
> ./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:3.2.1{code}
> Execute the following:
> {code:java}
> import org.apache.spark.sql.{Row, SparkSession}
> import org.apache.spark.sql.types._
> val rdd = sc.parallelize(Seq(Row(("-128").toByte)))
> val schema = new StructType().add(StructField("c1", ByteType, true))
> val df = spark.createDataFrame(rdd, schema)
> df.show(false)
> df.write.mode("overwrite").format("avro").saveAsTable("byte_avro"){code}
> On Spark 3.2.1 (commit {{{}4f25b3f712{}}}), using {{spark-sql}} with the Avro 
> package:
> {code:java}
> ./bin/spark-sql --packages org.apache.spark:spark-avro_2.12:3.2.1{code}
> Execute the following:
> {code:java}
> spark-sql> select * from byte_avro;{code}
> h3. Expected behavior
> We expect the output of the {{SELECT}} query to be {{{}-128{}}}. 
> Additionally, we expect the data type to be preserved (it is changed from 
> BYTE/TINYINT to INT, hence the mismatch). We tried other formats like ORC and 
> the outcome is consistent with this expectation. Here are the logs from our 
> attempt at doing the same with ORC:
> {code:java}
> scala> df.write.mode("overwrite").format("orc").saveAsTable("byte_orc")
> 2022-09-09 21:38:28,880 WARN conf.HiveConf: HiveConf of name 
> hive.stats.jdbc.timeout does not exist
> 2022-09-09 21:38:28,880 WARN conf.HiveConf: HiveConf of name 
> hive.stats.retries.wait does not exist
> 2022-09-09 21:38:34,642 WARN session.SessionState: METASTORE_FILTER_HOOK will 
> be ignored, since hive.security.authorization.manage
> r is set to instance of HiveAuthorizerFactory.
> 2022-09-09 21:38:34,716 WARN conf.HiveConf: HiveConf of name 
> hive.internal.ss.authz.settings.applied.marker does not exist
> 2022-09-09 21:38:34,716 WARN conf.HiveConf: HiveConf of name 
> hive.stats.jdbc.timeout does not exist
> 2022-09-09 21:38:34,716 WARN conf.HiveConf: HiveConf of name 
> hive.stats.retries.wait does not exist
> scala> spark.sql("select * from byte_orc;")
> res2: org.apache.spark.sql.DataFrame = [c1: tinyint]
> scala> spark.sql("select * from byte_orc;").show(false)
> +----+
> |c1  |
> +----+
> |-128|
> +----+
> {code}
> h3. Root Cause
> h4. 
> [AvroSerializer|https://github.com/apache/spark/blob/v3.2.1/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala#L114-L119]
> {code:java}
>    (catalystType, avroType.getType) match {
>       case (NullType, NULL) =>
>         (getter, ordinal) => null
>       case (BooleanType, BOOLEAN) =>
>         (getter, ordinal) => getter.getBoolean(ordinal)
>       case (ByteType, INT) =>
>         (getter, ordinal) => getter.getByte(ordinal).toInt
>       case (ShortType, INT) =>
>         (getter, ordinal) => getter.getShort(ordinal).toInt
>       case (IntegerType, INT) =>
>         (getter, ordinal) => getter.getInt(ordinal){code}
> h4. 
> [AvroDeserializer|https://github.com/apache/spark/blob/v3.2.1/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala#L121-L130]
> {code:java}
>     (avroType.getType, catalystType) match {
>       case (NULL, NullType) => (updater, ordinal, _) =>
>         updater.setNullAt(ordinal)
>       // TODO: we can avoid boxing if future version of avro provide 
> primitive accessors.
>       case (BOOLEAN, BooleanType) => (updater, ordinal, value) =>
>         updater.setBoolean(ordinal, value.asInstanceOf[Boolean])
>       case (INT, IntegerType) => (updater, ordinal, value) =>
>         updater.setInt(ordinal, value.asInstanceOf[Int])
>       case (INT, DateType) => (updater, ordinal, value) =>
>         updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int]))
> {code}
> AvroSerializer converts Spark's ByteType into Avro's INT. Further, Spark's 
> AvroDeserializer expects Avro's INT to map to Spark's IntegerType. The 
> mismatch between user-specified ByteType & the type AvroDeserializer expects 
> (IntegerType) is the root cause of this issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to