[jira] [Comment Edited] (SPARK-26836) Columns get switched in Spark SQL using Avro backed Hive table if schema evolves

2021-01-08 Thread Attila Zsolt Piros (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-26836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17261398#comment-17261398
 ] 

Attila Zsolt Piros edited comment on SPARK-26836 at 1/8/21, 4:15 PM:
-

I am working on this and a PR can be expected this weekend / next week


was (Author: attilapiros):
I am working on a this 

> Columns get switched in Spark SQL using Avro backed Hive table if schema 
> evolves
> 
>
> Key: SPARK-26836
> URL: https://issues.apache.org/jira/browse/SPARK-26836
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.1, 2.4.0
> Environment: I tested with Hive and HCatalog which runs on version 
> 2.3.4 and with Spark 2.3.1 and 2.4
>Reporter: Tamás Németh
>Priority: Critical
>  Labels: correctness
> Attachments: doctors.avro, doctors_evolved.avro, 
> doctors_evolved.json, original.avsc
>
>
> I have a hive avro table where the avro schema is stored on s3 next to the 
> avro files. 
> In the table definiton the avro.schema.url always points to the latest 
> partition's _schema.avsc file which is always the lates schema. (Avro schemas 
> are backward and forward compatible in a table)
> When new data comes in, I always add a new partition where the 
> avro.schema.url properties also set to the _schema.avsc which was used when 
> it was added and of course I always update the table avro.schema.url property 
> to the latest one.
> Querying this table works fine until the schema evolves in a way that a new 
> optional property is added in the middle. 
> When this happens then after the spark sql query the columns in the old 
> partition gets mixed up and it shows the wrong data for the columns.
> If I query the table with Hive then everything is perfectly fine and it gives 
> me back the correct columns for the partitions which were created the old 
> schema and for the new which was created the evolved schema.
>  
> Here is how I could reproduce with the 
> [doctors.avro|https://github.com/apache/spark/blob/master/sql/hive/src/test/resources/data/files/doctors.avro]
>  example data in sql test suite.
>  # I have created two partition folder:
> {code:java}
> [hadoop@ip-192-168-10-158 hadoop]$ hdfs dfs -ls s3://somelocation/doctors/*/
> Found 2 items
> -rw-rw-rw- 1 hadoop hadoop 418 2019-02-06 12:48 s3://somelocation/doctors
> /dt=2019-02-05/_schema.avsc
> -rw-rw-rw- 1 hadoop hadoop 521 2019-02-06 12:13 s3://somelocation/doctors
> /dt=2019-02-05/doctors.avro
> Found 2 items
> -rw-rw-rw- 1 hadoop hadoop 580 2019-02-06 12:49 s3://somelocation/doctors
> /dt=2019-02-06/_schema.avsc
> -rw-rw-rw- 1 hadoop hadoop 577 2019-02-06 12:13 s3://somelocation/doctors
> /dt=2019-02-06/doctors_evolved.avro{code}
> Here the first partition had data which was created with the schema before 
> evolving and the second one had the evolved one. (the evolved schema is the 
> same as in your testcase except I moved the extra_field column to the last 
> from the second and I generated two lines of avro data with the evolved 
> schema.
>  # I have created a hive table with the following command:
>  
> {code:java}
> CREATE EXTERNAL TABLE `default.doctors`
>  PARTITIONED BY (
>  `dt` string
>  )
>  ROW FORMAT SERDE
>  'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
>  WITH SERDEPROPERTIES (
>  'avro.schema.url'='s3://somelocation/doctors/
> /dt=2019-02-06/_schema.avsc')
>  STORED AS INPUTFORMAT
>  'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
>  OUTPUTFORMAT
>  'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
>  LOCATION
>  's3://somelocation/doctors/'
>  TBLPROPERTIES (
>  'transient_lastDdlTime'='1538130975'){code}
>  
> Here as you can see the table schema url points to the latest schema
> 3. I ran an msck _repair table_ to pick up all the partitions.
> Fyi: If I run my select * query from here then everything is fine and no 
> columns switch happening.
> 4. Then I changed the first partition's avro.schema.url url to points to the 
> schema which is under the partition folder (non-evolved one -> 
> s3://somelocation/doctors/
> /dt=2019-02-05/_schema.avsc)
> Then if you ran a _select * from default.spark_test_ then the columns will be 
> mixed up (on the data below the first name column becomes the extra_field 
> column. I guess because in the latest schema it is the second column):
>  
> {code:java}
> number,extra_field,first_name,last_name,dt 
> 6,Colin,Baker,null,2019-02-05 
> 3,Jon,Pertwee,null,2019-02-05 
> 4,Tom,Baker,null,2019-02-05 
> 5,Peter,Davison,null,2019-02-05 
> 11,Matt,Smith,null,2019-02-05 
> 1,William,Hartnell,null,2019-02-05 
> 7,Sylvester,McCoy,null,2019-02-05 
> 8,Paul,McGann,null,2019-02-05 
> 2,Patrick,Troughton,null,2019-02-05 
> 9,Chris

[jira] [Comment Edited] (SPARK-26836) Columns get switched in Spark SQL using Avro backed Hive table if schema evolves

2019-06-20 Thread david ravet (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16868414#comment-16868414
 ] 

david ravet edited comment on SPARK-26836 at 6/20/19 10:05 AM:
---

Hi,

 

We encounter the same issue with Spark 2.2.0 when reading avro from a partition 
where avro.schema.url point to a new schema (forward compatibility) but stored 
avro files were in older schema.

Querying with Hive client is OK but with Spark Sql we get 
{code}
19/06/20 10:43:47 WARN avro.AvroDeserializer: Received different schemas.  Have 
to re-encode: 
{"type":"record","name":"KeyValuePair","namespace":"org.apache.avro.mapreduce","doc":"A
 key/value pair","fields":[{"name":"key","type": .
SIZE{4207b7ac:16b740e5ed1:-8000=org.apache.hadoop.hive.serde2.avro.AvroDeserializer$SchemaReEncoder@17173314}
 ID 4207b7ac:16b740e5ed1:-8000
19/06/20 10:43:47 ERROR executor.Executor: Exception in task 6.0 in stage 1.0 
(TID 22)
java.lang.RuntimeException: Hive internal error: conversion of double to 
structnot
 supported yet.
at 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters$StructConverter.(ObjectInspectorConverters.java:380)
at 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.getConverter(ObjectInspectorConverters.java:155)
at 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters$StructConverter.(ObjectInspectorConverters.java:374)
at 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.getConverter(ObjectInspectorConverters.java:155)
at 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters$ListConverter.convert(ObjectInspectorConverters.java:331)
at 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters$StructConverter.convert(ObjectInspectorConverters.java:396)
at 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters$StructConverter.convert(ObjectInspectorConverters.java:396)
at 
org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:430)
at 
org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:429)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{code}


was (Author: david_ravet):
Hi,

 

We encounter the same issue with Spark 2.2.0 when reading avro from a partition 
where avro.schema.url point to a new schema (forward compatibility) but stored 
avro files were in older schema.

> Columns get switched in Spark SQL using Avro backed Hive table if schema 
> evolves
> 
>
> Key: SPARK-26836
> URL: https://issues.apache.org/jira/browse/SPARK-26836
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.1, 2.4.0
> Environment: I tested with Hive and HCatalog which runs on version 
> 2.3.4 and with Spark 2.3.1 and 2.4
>Reporter: Tamas Nemeth
>Priority: Major
>  Labels: correctness
> Attachments: doctors.avro, doctors_evolved.avro, 
> doctors_evolved.json, original.avsc
>
>
> I have a hive avro table where the avro schema is stored on s3 next to the 
> avro files. 
> In the table definiton the avro.schema.url always points to the latest 
> partition's _schema.avsc file which is always the lates schema. (Avro schemas 
> are backward and forward compatible in a table)
> When new data comes in, I always add a new partition where the 
> avro.schema.url properties also set to the _schema.avsc which was used when 
> it was added and of course I always update the table avro.schema.url property 
> to the latest one.
> Querying this table works fine until the schema evolves in a way that a new 
> optional property is added in the middle. 
> When this happens then after the spark sql query the columns in the old 
> partition gets