[ https://issues.apache.org/jira/browse/SPARK-40409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17611909#comment-17611909 ]
Apache Spark commented on SPARK-40409: -------------------------------------- User 'zzzzming95' has created a pull request for this issue: https://github.com/apache/spark/pull/38068 > IncompatibleSchemaException when BYTE stored from DataFrame to Avro is read > using spark-sql > ------------------------------------------------------------------------------------------- > > Key: SPARK-40409 > URL: https://issues.apache.org/jira/browse/SPARK-40409 > Project: Spark > Issue Type: Bug > Components: Input/Output > Affects Versions: 3.2.1 > Reporter: xsys > Priority: Major > > h3. Describe the bug > We are trying to store a BYTE {{"-128"}} to a table created via Spark > DataFrame. The table is created with the Avro file format. We encounter no > errors while creating the table and inserting the aforementioned BYTE value. > However, performing a SELECT query on the table through spark-sql results in > an {{IncompatibleSchemaException}} as shown below: > {code:java} > 2022-09-09 21:15:03,248 ERROR executor.Executor: Exception in task 0.0 in > stage 0.0 (TID 0) > org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Avro > type {"type":"record","name":"topLevelRecord","fields"$ > [{"name":"c1","type":["int","null"]}]} to SQL type STRUCT<`c1`: TINYINT>{code} > h3. Step to reproduce > On Spark 3.2.1 (commit {{{}4f25b3f712{}}}), using {{spark-shell}} with the > Avro package: > {code:java} > ./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:3.2.1{code} > Execute the following: > {code:java} > import org.apache.spark.sql.{Row, SparkSession} > import org.apache.spark.sql.types._ > val rdd = sc.parallelize(Seq(Row(("-128").toByte))) > val schema = new StructType().add(StructField("c1", ByteType, true)) > val df = spark.createDataFrame(rdd, schema) > df.show(false) > df.write.mode("overwrite").format("avro").saveAsTable("byte_avro"){code} > On Spark 3.2.1 (commit {{{}4f25b3f712{}}}), using {{spark-sql}} with the Avro > package: > {code:java} > ./bin/spark-sql --packages org.apache.spark:spark-avro_2.12:3.2.1{code} > Execute the following: > {code:java} > spark-sql> select * from byte_avro;{code} > h3. Expected behavior > We expect the output of the {{SELECT}} query to be {{{}-128{}}}. > Additionally, we expect the data type to be preserved (it is changed from > BYTE/TINYINT to INT, hence the mismatch). We tried other formats like ORC and > the outcome is consistent with this expectation. Here are the logs from our > attempt at doing the same with ORC: > {code:java} > scala> df.write.mode("overwrite").format("orc").saveAsTable("byte_orc") > 2022-09-09 21:38:28,880 WARN conf.HiveConf: HiveConf of name > hive.stats.jdbc.timeout does not exist > 2022-09-09 21:38:28,880 WARN conf.HiveConf: HiveConf of name > hive.stats.retries.wait does not exist > 2022-09-09 21:38:34,642 WARN session.SessionState: METASTORE_FILTER_HOOK will > be ignored, since hive.security.authorization.manage > r is set to instance of HiveAuthorizerFactory. > 2022-09-09 21:38:34,716 WARN conf.HiveConf: HiveConf of name > hive.internal.ss.authz.settings.applied.marker does not exist > 2022-09-09 21:38:34,716 WARN conf.HiveConf: HiveConf of name > hive.stats.jdbc.timeout does not exist > 2022-09-09 21:38:34,716 WARN conf.HiveConf: HiveConf of name > hive.stats.retries.wait does not exist > scala> spark.sql("select * from byte_orc;") > res2: org.apache.spark.sql.DataFrame = [c1: tinyint] > scala> spark.sql("select * from byte_orc;").show(false) > +----+ > |c1 | > +----+ > |-128| > +----+ > {code} > h3. Root Cause > h4. > [AvroSerializer|https://github.com/apache/spark/blob/v3.2.1/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala#L114-L119] > {code:java} > (catalystType, avroType.getType) match { > case (NullType, NULL) => > (getter, ordinal) => null > case (BooleanType, BOOLEAN) => > (getter, ordinal) => getter.getBoolean(ordinal) > case (ByteType, INT) => > (getter, ordinal) => getter.getByte(ordinal).toInt > case (ShortType, INT) => > (getter, ordinal) => getter.getShort(ordinal).toInt > case (IntegerType, INT) => > (getter, ordinal) => getter.getInt(ordinal){code} > h4. > [AvroDeserializer|https://github.com/apache/spark/blob/v3.2.1/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala#L121-L130] > {code:java} > (avroType.getType, catalystType) match { > case (NULL, NullType) => (updater, ordinal, _) => > updater.setNullAt(ordinal) > // TODO: we can avoid boxing if future version of avro provide > primitive accessors. > case (BOOLEAN, BooleanType) => (updater, ordinal, value) => > updater.setBoolean(ordinal, value.asInstanceOf[Boolean]) > case (INT, IntegerType) => (updater, ordinal, value) => > updater.setInt(ordinal, value.asInstanceOf[Int]) > case (INT, DateType) => (updater, ordinal, value) => > updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int])) > {code} > AvroSerializer converts Spark's ByteType into Avro's INT. Further, Spark's > AvroDeserializer expects Avro's INT to map to Spark's IntegerType. The > mismatch between user-specified ByteType & the type AvroDeserializer expects > (IntegerType) is the root cause of this issue. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org