[ 
https://issues.apache.org/jira/browse/SPARK-40409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xsys updated SPARK-40409:
-------------------------
    Description: 
h3. Describe the bug

We are trying to store a BYTE {{"-128"}} to a table created via Spark 
DataFrame. The table is created with the Avro file format. We encounter no 
errors while creating the table and inserting the aforementioned BYTE value. 
However, performing a SELECT query on the table through spark-sql results in an 
{{IncompatibleSchemaException}} as shown below:
{code:java}
2022-09-09 21:15:03,248 ERROR executor.Executor: Exception in task 0.0 in stage 
0.0 (TID 0)
org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Avro type 
{"type":"record","name":"topLevelRecord","fields"$
[{"name":"c1","type":["int","null"]}]} to SQL type STRUCT<`c1`: TINYINT>{code}
h3. Step to reproduce

On Spark 3.2.1 (commit {{{}4f25b3f712{}}}), using {{spark-shell}} with the Avro 
package:
{code:java}
./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:3.2.1{code}
Execute the following:
{code:java}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._
val rdd = sc.parallelize(Seq(Row(("-128").toByte)))
val schema = new StructType().add(StructField("c1", ByteType, true))
val df = spark.createDataFrame(rdd, schema)
df.show(false)
df.write.mode("overwrite").format("avro").saveAsTable("byte_avro"){code}
On Spark 3.2.1 (commit {{{}4f25b3f712{}}}), using {{spark-sql}} with the Avro 
package:
{code:java}
./bin/spark-sql --packages org.apache.spark:spark-avro_2.12:3.2.1{code}
Execute the following:
{code:java}
spark-sql> select * from byte_avro;{code}
h3. Expected behavior

We expect the output of the {{SELECT}} query to be {{{}-128{}}}. Additionally, 
we expect the data type to be preserved (it is changed from BYTE/TINYINT to 
INT, hence the mismatch). We tried other formats like ORC and the outcome is 
consistent with this expectation. Here are the logs from our attempt at doing 
the same with ORC:
{code:java}
scala> df.write.mode("overwrite").format("orc").saveAsTable("byte_orc")
2022-09-09 21:38:28,880 WARN conf.HiveConf: HiveConf of name 
hive.stats.jdbc.timeout does not exist
2022-09-09 21:38:28,880 WARN conf.HiveConf: HiveConf of name 
hive.stats.retries.wait does not exist
2022-09-09 21:38:34,642 WARN session.SessionState: METASTORE_FILTER_HOOK will 
be ignored, since hive.security.authorization.manage
r is set to instance of HiveAuthorizerFactory.
2022-09-09 21:38:34,716 WARN conf.HiveConf: HiveConf of name 
hive.internal.ss.authz.settings.applied.marker does not exist
2022-09-09 21:38:34,716 WARN conf.HiveConf: HiveConf of name 
hive.stats.jdbc.timeout does not exist
2022-09-09 21:38:34,716 WARN conf.HiveConf: HiveConf of name 
hive.stats.retries.wait does not exist
scala> spark.sql("select * from byte_orc;")
res2: org.apache.spark.sql.DataFrame = [c1: tinyint]
scala> spark.sql("select * from byte_orc;").show(false)
+----+
|c1  |
+----+
|-128|
+----+
{code}
h3. Root Cause
h4. 
[AvroSerializer|https://github.com/apache/spark/blob/v3.2.1/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala#L114-L119]
{code:java}
   (catalystType, avroType.getType) match {
      case (NullType, NULL) =>
        (getter, ordinal) => null
      case (BooleanType, BOOLEAN) =>
        (getter, ordinal) => getter.getBoolean(ordinal)
      case (ByteType, INT) =>
        (getter, ordinal) => getter.getByte(ordinal).toInt
      case (ShortType, INT) =>
        (getter, ordinal) => getter.getShort(ordinal).toInt
      case (IntegerType, INT) =>
        (getter, ordinal) => getter.getInt(ordinal){code}
h4. 
[AvroDeserializer|https://github.com/apache/spark/blob/v3.2.1/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala#L121-L130]
{code:java}
    (avroType.getType, catalystType) match {
      case (NULL, NullType) => (updater, ordinal, _) =>
        updater.setNullAt(ordinal)
      // TODO: we can avoid boxing if future version of avro provide primitive 
accessors.
      case (BOOLEAN, BooleanType) => (updater, ordinal, value) =>
        updater.setBoolean(ordinal, value.asInstanceOf[Boolean])
      case (INT, IntegerType) => (updater, ordinal, value) =>
        updater.setInt(ordinal, value.asInstanceOf[Int])
      case (INT, DateType) => (updater, ordinal, value) =>
        updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int]))
{code}
AvroSerializer converts Spark's ByteType into Avro's INT. Further, Spark's 
AvroDeserializer expects Avro's INT to map to Spark's IntegerType. The mismatch 
between user-specified ByteType & the type AvroDeserializer expects 
(IntegerType) is the root cause of this issue.

 

 

  was:
h3. Describe the bug

We are trying to store a BYTE {{"-128"}} to a table created via Spark 
DataFrame. The table is created with the Avro file format. We encounter no 
errors while creating the table and inserting the aforementioned BYTE value. 
However, performing a SELECT query on the table through spark-sql results in an 
{{IncompatibleSchemaException}} as shown below:

 
{code:java}
2022-09-09 21:15:03,248 ERROR executor.Executor: Exception in task 0.0 in stage 
0.0 (TID 0)
org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Avro type 
{"type":"record","name":"topLevelRecord","fields"$
[{"name":"c1","type":["int","null"]}]} to SQL type STRUCT<`c1`: TINYINT>{code}
h3. Step to reproduce

On Spark 3.2.1 (commit {{{}4f25b3f712{}}}), using {{spark-shell}} with the Avro 
package:
{code:java}
./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:3.2.1{code}
Execute the following:
{code:java}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._
val rdd = sc.parallelize(Seq(Row(("-128").toByte)))
val schema = new StructType().add(StructField("c1", ByteType, true))
val df = spark.createDataFrame(rdd, schema)
df.show(false)
df.write.mode("overwrite").format("avro").saveAsTable("byte_avro"){code}
On Spark 3.2.1 (commit {{{}4f25b3f712{}}}), using {{spark-sql}} with the Avro 
package:
{code:java}
./bin/spark-sql --packages org.apache.spark:spark-avro_2.12:3.2.1{code}
Execute the following:
{code:java}
spark-sql> select * from byte_avro;{code}
h3. Expected behavior

We expect the output of the {{SELECT}} query to be {{{}-128{}}}. Additionally, 
we expect the data type to be preserved (it is changed from BYTE/TINYINT to 
INT, hence the mismatch). We tried other formats like ORC and the outcome is 
consistent with this expectation. Here are the logs from our attempt at doing 
the same with ORC:

 
{code:java}
scala> df.write.mode("overwrite").format("orc").saveAsTable("byte_orc")
2022-09-09 21:38:28,880 WARN conf.HiveConf: HiveConf of name 
hive.stats.jdbc.timeout does not exist
2022-09-09 21:38:28,880 WARN conf.HiveConf: HiveConf of name 
hive.stats.retries.wait does not exist
2022-09-09 21:38:34,642 WARN session.SessionState: METASTORE_FILTER_HOOK will 
be ignored, since hive.security.authorization.manage
r is set to instance of HiveAuthorizerFactory.
2022-09-09 21:38:34,716 WARN conf.HiveConf: HiveConf of name 
hive.internal.ss.authz.settings.applied.marker does not exist
2022-09-09 21:38:34,716 WARN conf.HiveConf: HiveConf of name 
hive.stats.jdbc.timeout does not exist
2022-09-09 21:38:34,716 WARN conf.HiveConf: HiveConf of name 
hive.stats.retries.wait does not exist
scala> spark.sql("select * from byte_orc;")
res2: org.apache.spark.sql.DataFrame = [c1: tinyint]
scala> spark.sql("select * from byte_orc;").show(false)
+----+
|c1  |
+----+
|-128|
+----+
{code}
h3. Root Cause
h4. 
[AvroSerializer|https://github.com/apache/spark/blob/v3.2.1/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala#L114-L119]
{code:java}
   (catalystType, avroType.getType) match {
      case (NullType, NULL) =>
        (getter, ordinal) => null
      case (BooleanType, BOOLEAN) =>
        (getter, ordinal) => getter.getBoolean(ordinal)
      case (ByteType, INT) =>
        (getter, ordinal) => getter.getByte(ordinal).toInt
      case (ShortType, INT) =>
        (getter, ordinal) => getter.getShort(ordinal).toInt
      case (IntegerType, INT) =>
        (getter, ordinal) => getter.getInt(ordinal){code}
h4. 
[AvroDeserializer|https://github.com/apache/spark/blob/v3.2.1/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala#L121-L130]

 
{code:java}
    (avroType.getType, catalystType) match {
      case (NULL, NullType) => (updater, ordinal, _) =>
        updater.setNullAt(ordinal)
      // TODO: we can avoid boxing if future version of avro provide primitive 
accessors.
      case (BOOLEAN, BooleanType) => (updater, ordinal, value) =>
        updater.setBoolean(ordinal, value.asInstanceOf[Boolean])
      case (INT, IntegerType) => (updater, ordinal, value) =>
        updater.setInt(ordinal, value.asInstanceOf[Int])
      case (INT, DateType) => (updater, ordinal, value) =>
        updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int]))
{code}
 

 

AvroSerializer converts Spark's ByteType into Avro's INT. Further, Spark's 
AvroDeserializer expects Avro's INT to map to Spark's IntegerType. The mismatch 
between user-specified ByteType & the type AvroDeserializer expects 
(IntegerType) is the root cause of this issue.

 

 


> IncompatibleSchemaException when BYTE stored from DataFrame to Avro is read 
> using spark-sql
> -------------------------------------------------------------------------------------------
>
>                 Key: SPARK-40409
>                 URL: https://issues.apache.org/jira/browse/SPARK-40409
>             Project: Spark
>          Issue Type: Bug
>          Components: Input/Output
>    Affects Versions: 3.2.1
>            Reporter: xsys
>            Priority: Major
>
> h3. Describe the bug
> We are trying to store a BYTE {{"-128"}} to a table created via Spark 
> DataFrame. The table is created with the Avro file format. We encounter no 
> errors while creating the table and inserting the aforementioned BYTE value. 
> However, performing a SELECT query on the table through spark-sql results in 
> an {{IncompatibleSchemaException}} as shown below:
> {code:java}
> 2022-09-09 21:15:03,248 ERROR executor.Executor: Exception in task 0.0 in 
> stage 0.0 (TID 0)
> org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Avro 
> type {"type":"record","name":"topLevelRecord","fields"$
> [{"name":"c1","type":["int","null"]}]} to SQL type STRUCT<`c1`: TINYINT>{code}
> h3. Step to reproduce
> On Spark 3.2.1 (commit {{{}4f25b3f712{}}}), using {{spark-shell}} with the 
> Avro package:
> {code:java}
> ./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:3.2.1{code}
> Execute the following:
> {code:java}
> import org.apache.spark.sql.{Row, SparkSession}
> import org.apache.spark.sql.types._
> val rdd = sc.parallelize(Seq(Row(("-128").toByte)))
> val schema = new StructType().add(StructField("c1", ByteType, true))
> val df = spark.createDataFrame(rdd, schema)
> df.show(false)
> df.write.mode("overwrite").format("avro").saveAsTable("byte_avro"){code}
> On Spark 3.2.1 (commit {{{}4f25b3f712{}}}), using {{spark-sql}} with the Avro 
> package:
> {code:java}
> ./bin/spark-sql --packages org.apache.spark:spark-avro_2.12:3.2.1{code}
> Execute the following:
> {code:java}
> spark-sql> select * from byte_avro;{code}
> h3. Expected behavior
> We expect the output of the {{SELECT}} query to be {{{}-128{}}}. 
> Additionally, we expect the data type to be preserved (it is changed from 
> BYTE/TINYINT to INT, hence the mismatch). We tried other formats like ORC and 
> the outcome is consistent with this expectation. Here are the logs from our 
> attempt at doing the same with ORC:
> {code:java}
> scala> df.write.mode("overwrite").format("orc").saveAsTable("byte_orc")
> 2022-09-09 21:38:28,880 WARN conf.HiveConf: HiveConf of name 
> hive.stats.jdbc.timeout does not exist
> 2022-09-09 21:38:28,880 WARN conf.HiveConf: HiveConf of name 
> hive.stats.retries.wait does not exist
> 2022-09-09 21:38:34,642 WARN session.SessionState: METASTORE_FILTER_HOOK will 
> be ignored, since hive.security.authorization.manage
> r is set to instance of HiveAuthorizerFactory.
> 2022-09-09 21:38:34,716 WARN conf.HiveConf: HiveConf of name 
> hive.internal.ss.authz.settings.applied.marker does not exist
> 2022-09-09 21:38:34,716 WARN conf.HiveConf: HiveConf of name 
> hive.stats.jdbc.timeout does not exist
> 2022-09-09 21:38:34,716 WARN conf.HiveConf: HiveConf of name 
> hive.stats.retries.wait does not exist
> scala> spark.sql("select * from byte_orc;")
> res2: org.apache.spark.sql.DataFrame = [c1: tinyint]
> scala> spark.sql("select * from byte_orc;").show(false)
> +----+
> |c1  |
> +----+
> |-128|
> +----+
> {code}
> h3. Root Cause
> h4. 
> [AvroSerializer|https://github.com/apache/spark/blob/v3.2.1/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala#L114-L119]
> {code:java}
>    (catalystType, avroType.getType) match {
>       case (NullType, NULL) =>
>         (getter, ordinal) => null
>       case (BooleanType, BOOLEAN) =>
>         (getter, ordinal) => getter.getBoolean(ordinal)
>       case (ByteType, INT) =>
>         (getter, ordinal) => getter.getByte(ordinal).toInt
>       case (ShortType, INT) =>
>         (getter, ordinal) => getter.getShort(ordinal).toInt
>       case (IntegerType, INT) =>
>         (getter, ordinal) => getter.getInt(ordinal){code}
> h4. 
> [AvroDeserializer|https://github.com/apache/spark/blob/v3.2.1/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala#L121-L130]
> {code:java}
>     (avroType.getType, catalystType) match {
>       case (NULL, NullType) => (updater, ordinal, _) =>
>         updater.setNullAt(ordinal)
>       // TODO: we can avoid boxing if future version of avro provide 
> primitive accessors.
>       case (BOOLEAN, BooleanType) => (updater, ordinal, value) =>
>         updater.setBoolean(ordinal, value.asInstanceOf[Boolean])
>       case (INT, IntegerType) => (updater, ordinal, value) =>
>         updater.setInt(ordinal, value.asInstanceOf[Int])
>       case (INT, DateType) => (updater, ordinal, value) =>
>         updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int]))
> {code}
> AvroSerializer converts Spark's ByteType into Avro's INT. Further, Spark's 
> AvroDeserializer expects Avro's INT to map to Spark's IntegerType. The 
> mismatch between user-specified ByteType & the type AvroDeserializer expects 
> (IntegerType) is the root cause of this issue.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to